Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefetching a configurable amount of data. #394

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,6 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme

final var suffix = ObjectKeyFactory.Suffix.LOG;
final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);

return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
.toInputStream();
} catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public abstract class ChunkCache<T> implements ChunkManager, Configurable {

protected AsyncCache<ChunkKey, T> cache;

private int prefetchingSize;

protected ChunkCache(final ChunkManager chunkManager) {
this.chunkManager = chunkManager;
this.statsCounter = new CaffeineStatsCounter(METRIC_GROUP);
Expand All @@ -69,6 +71,7 @@ protected ChunkCache(final ChunkManager chunkManager) {
public InputStream getChunk(final ObjectKey objectKey,
final SegmentManifest manifest,
final int chunkId) throws StorageBackendException, IOException {
startPrefetching(objectKey, manifest, chunkId);
final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId);
final AtomicReference<InputStream> result = new AtomicReference<>();
try {
Expand Down Expand Up @@ -128,6 +131,7 @@ public InputStream getChunk(final ObjectKey objectKey,
public abstract Weigher<ChunkKey, T> weigher();

protected AsyncCache<ChunkKey, T> buildCache(final ChunkCacheConfig config) {
this.prefetchingSize = config.cachePrefetchingSize();
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher()));
config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
Expand All @@ -139,4 +143,27 @@ protected AsyncCache<ChunkKey, T> buildCache(final ChunkCacheConfig config) {
statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
return cache;
}

private void startPrefetching(final ObjectKey segmentKey,
final SegmentManifest segmentManifest,
final int startChunkId) {
final var chunks = segmentManifest.chunkIndex().chunks();
final var chunksToFetch = chunks.subList(
Math.min(startChunkId + 1, chunks.size()),
Math.min(startChunkId + 1 + prefetchingSize, chunks.size())
);
chunksToFetch.forEach(chunk -> {
final ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id);
cache.asMap()
.computeIfAbsent(chunkKey, key -> CompletableFuture.supplyAsync(() -> {
try {
final InputStream chunkStream =
chunkManager.getChunk(segmentKey, segmentManifest, chunk.id);
return this.cacheChunk(chunkKey, chunkStream);
} catch (final StorageBackendException | IOException e) {
throw new CompletionException(e);
}
}, executor));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class ChunkCacheConfig extends AbstractConfig {
+ "where \"-1\" represents infinite retention";
private static final long DEFAULT_CACHE_RETENTION_MS = 600_000;

private static final String CACHE_PREFETCHING_SIZE_CONFIG = "prefetching.size";
private static final String CACHE_PREFETCHING_SIZE_DOC =
"The number of chunks to be eagerly prefetched and cached";

private static final int CACHE_PREFETCHING_SIZE_DEFAULT = 0; //TODO find out what it should be
jeqo marked this conversation as resolved.
Show resolved Hide resolved

private static ConfigDef addCacheConfigs(final ConfigDef configDef) {
configDef.define(
CACHE_SIZE_CONFIG,
Expand All @@ -50,6 +56,14 @@ private static ConfigDef addCacheConfigs(final ConfigDef configDef) {
ConfigDef.Importance.MEDIUM,
CACHE_RETENTION_DOC
);
configDef.define(
CACHE_PREFETCHING_SIZE_CONFIG,
ConfigDef.Type.INT,
CACHE_PREFETCHING_SIZE_DEFAULT,
ConfigDef.Range.between(0, Integer.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_PREFETCHING_SIZE_DOC
);
return configDef;
}

Expand All @@ -72,4 +86,8 @@ public Optional<Duration> cacheRetention() {
}
return Optional.of(Duration.ofMillis(rawValue));
}

public int cachePrefetchingSize() {
return getInt(CACHE_PREFETCHING_SIZE_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.storage.BytesRange;

import com.fasterxml.jackson.annotation.JsonProperty;

Expand Down Expand Up @@ -108,6 +109,17 @@ public Chunk findChunkForOriginalOffset(final int offset) {
);
}

@Override
public List<Chunk> listChunksForRange(final BytesRange bytesRange) {
Chunk current;
final var result = new ArrayList<Chunk>();
for (int i = bytesRange.from; i < bytesRange.to && i < originalFileSize; i += current.originalSize) {
current = findChunkForOriginalOffset(i);
result.add(current);
}
return result;
}

private int originalChunkSize(final int chunkI) {
final boolean isFinalChunk = chunkI == chunkCount - 1;
return isFinalChunk ? (originalFileSize - (chunkCount - 1) * originalChunkSize) : originalChunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.storage.BytesRange;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
Expand Down Expand Up @@ -49,4 +50,6 @@ public interface ChunkIndex {
* Returns all chunks in the index.
*/
List<Chunk> chunks();

List<Chunk> listChunksForRange(BytesRange bytesRange);
ivanyu marked this conversation as resolved.
Show resolved Hide resolved
}
ivanyu marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,16 @@ void invalidRetention() {
)).isInstanceOf(ConfigException.class)
.hasMessage("Invalid value -2 for configuration retention.ms: Value must be at least -1");
}

@Test
void invalidPrefetchingSize() {
assertThatThrownBy(() -> new ChunkCacheConfig(
new ConfigDef(),
Map.of(
"size", "-1",
"prefetching.size", "-1"
)
)).isInstanceOf(ConfigException.class)
.hasMessage("Invalid value -1 for configuration prefetching.size: Value must be at least 0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@

import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -48,6 +51,7 @@
*/
@ExtendWith(MockitoExtension.class)
class ChunkCacheMetricsTest {
private static final ChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(10, 10, 10, 10);
static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer();

public static final ObjectKey OBJECT_KEY_PATH = () -> "topic/segment";
Expand Down Expand Up @@ -79,6 +83,11 @@ private static Stream<Arguments> caches() {
));
}

@BeforeEach
void setUp() {
when(segmentManifest.chunkIndex()).thenReturn(FIXED_SIZE_CHUNK_INDEX);
}

@ParameterizedTest(name = "Cache {0}")
@MethodSource("caches")
void shouldRecordMetrics(final Class<ChunkCache<?>> chunkCacheClass, final Map<String, ?> config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.description;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mockingDetails;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand All @@ -67,7 +69,8 @@ class ChunkCacheTest {

private static final byte[] CHUNK_0 = "0123456789".getBytes();
private static final byte[] CHUNK_1 = "1011121314".getBytes();
private static final FixedSizeChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(10, 10, 10, 10);
private static final byte[] CHUNK_2 = "1011121314".getBytes();
private static final FixedSizeChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(10, 30, 10, 10);
private static final SegmentIndexesV1 SEGMENT_INDEXES = SegmentIndexesV1.builder()
.add(IndexType.OFFSET, 1)
.add(IndexType.TIMESTAMP, 1)
Expand Down Expand Up @@ -108,6 +111,8 @@ void setUp() throws Exception {
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0));
when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1))
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1));
when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2))
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_2));
}

@Test
Expand Down Expand Up @@ -203,6 +208,63 @@ void sizeBasedEviction() throws IOException, StorageBackendException {
verify(chunkManager, times(3)).getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt());
}

@Test
void prefetchingNextChunk() throws Exception {
chunkCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1",
"prefetching.size", "1"
));
chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 2);
verify(chunkManager, description("first chunk was fetched from remote"))
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
verify(chunkManager, description("second chunk was prefetched"))
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);
verify(chunkManager, never().description("third chunk was not prefetched "))
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2);

final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
assertThat(cachedChunk0).hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(chunkManager);

// checking that third chunk is prefetch when fetching chunk 1 from cache
final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);
assertThat(cachedChunk1).hasBinaryContent(CHUNK_1);
await("waiting for prefetching to finish").pollInterval(Duration.ofMillis(5))
.until(() -> chunkCache.statsCounter.snapshot().loadCount() == 5);
verify(chunkManager, description("third chunk was prefetched"))
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2);
verifyNoMoreInteractions(chunkManager);
}

@Test
void prefetchingWholeSegment() throws Exception {
chunkCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1",
"prefetching.size", SEGMENT_MANIFEST.chunkIndex().chunks().size()
));
chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 3);
// verifying fetching for all 3 chunks(2 prefetched)
verify(chunkManager, times(3)).getChunk(any(), any(), anyInt());

// no fetching from remote since chunk 0 is cached
final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
assertThat(cachedChunk0).hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(chunkManager);

// no fetching from remote since chunk 1 is cached
final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);
assertThat(cachedChunk1).hasBinaryContent(CHUNK_1);
verifyNoMoreInteractions(chunkManager);

// no fetching from remote since chunk 2 is cached
final InputStream cachedChunk2 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2);
assertThat(cachedChunk2).hasBinaryContent(CHUNK_2);
verifyNoMoreInteractions(chunkManager);
}
}

@Nested
Expand Down