Skip to content

Commit

Permalink
Prefetching a configurable amount of data.
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyPopov committed Oct 20, 2023
1 parent ccce813 commit 41f01af
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManagerFactory;
import io.aiven.kafka.tieredstorage.chunkmanager.cache.ChunkCache;
import io.aiven.kafka.tieredstorage.config.RemoteStorageManagerConfig;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
Expand Down Expand Up @@ -452,7 +453,9 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme

final var suffix = ObjectKeyFactory.Suffix.LOG;
final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);

if (chunkManager instanceof ChunkCache) {
((ChunkCache<?>) chunkManager).startPrefetching(segmentKey, segmentManifest, range.from);
}
return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
.toInputStream();
} catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@

import org.apache.kafka.common.Configurable;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

Expand All @@ -53,6 +55,8 @@ public abstract class ChunkCache<T> implements ChunkManager, Configurable {

protected AsyncCache<ChunkKey, T> cache;

private int prefetchingSize;

protected ChunkCache(final ChunkManager chunkManager) {
this.chunkManager = chunkManager;
this.statsCounter = new CaffeineStatsCounter(METRIC_GROUP);
Expand Down Expand Up @@ -128,6 +132,7 @@ public InputStream getChunk(final ObjectKey objectKey,
public abstract Weigher<ChunkKey, T> weigher();

protected AsyncCache<ChunkKey, T> buildCache(final ChunkCacheConfig config) {
this.prefetchingSize = config.cachePrefetchingSize();
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher()));
config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
Expand All @@ -139,4 +144,29 @@ protected AsyncCache<ChunkKey, T> buildCache(final ChunkCacheConfig config) {
statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
return cache;
}

public void startPrefetching(final ObjectKey segmentKey,
final SegmentManifest segmentManifest,
final int startPosition) {
final BytesRange prefetchingRange;
if (Integer.MAX_VALUE - startPosition < prefetchingSize) {
prefetchingRange = BytesRange.of(startPosition, Integer.MAX_VALUE);
} else {
prefetchingRange = BytesRange.of(startPosition, startPosition + prefetchingSize);
}
final var chunks = segmentManifest.chunkIndex().listChunksForRange(prefetchingRange);
chunks.forEach(chunk -> {
final ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id);
cache.asMap()
.computeIfAbsent(chunkKey, key -> CompletableFuture.supplyAsync(() -> {
try {
final InputStream chunkStream =
chunkManager.getChunk(segmentKey, segmentManifest, chunk.id);
return this.cacheChunk(chunkKey, chunkStream);
} catch (final StorageBackendException | IOException e) {
throw new CompletionException(e);
}
}, executor));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class ChunkCacheConfig extends AbstractConfig {
+ "where \"-1\" represents infinite retention";
private static final long DEFAULT_CACHE_RETENTION_MS = 600_000;

private static final String CACHE_PREFETCHING_SIZE_CONFIG = "prefetching.bytes";
private static final String CACHE_PREFETCHING_SIZE_DOC =
"The amount of data that should be eagerly prefetched and cached";

private static final int CACHE_PREFETCHING_SIZE_DEFAULT = 0; //TODO find out what it should be

private static ConfigDef addCacheConfigs(final ConfigDef configDef) {
configDef.define(
CACHE_SIZE_CONFIG,
Expand All @@ -50,6 +56,14 @@ private static ConfigDef addCacheConfigs(final ConfigDef configDef) {
ConfigDef.Importance.MEDIUM,
CACHE_RETENTION_DOC
);
configDef.define(
CACHE_PREFETCHING_SIZE_CONFIG,
ConfigDef.Type.INT,
CACHE_PREFETCHING_SIZE_DEFAULT,
ConfigDef.Range.between(0, Integer.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_PREFETCHING_SIZE_DOC
);
return configDef;
}

Expand All @@ -72,4 +86,8 @@ public Optional<Duration> cacheRetention() {
}
return Optional.of(Duration.ofMillis(rawValue));
}

public int cachePrefetchingSize() {
return getInt(CACHE_PREFETCHING_SIZE_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.storage.BytesRange;

import com.fasterxml.jackson.annotation.JsonProperty;

Expand Down Expand Up @@ -108,6 +109,17 @@ public Chunk findChunkForOriginalOffset(final int offset) {
);
}

@Override
public List<Chunk> listChunksForRange(final BytesRange bytesRange) {
Chunk current;
final var result = new ArrayList<Chunk>();
for (int i = bytesRange.from; i < bytesRange.to && i < originalFileSize; i += current.originalSize) {
current = findChunkForOriginalOffset(i);
result.add(current);
}
return result;
}

private int originalChunkSize(final int chunkI) {
final boolean isFinalChunk = chunkI == chunkCount - 1;
return isFinalChunk ? (originalFileSize - (chunkCount - 1) * originalChunkSize) : originalChunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.storage.BytesRange;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
Expand Down Expand Up @@ -49,4 +50,6 @@ public interface ChunkIndex {
* Returns all chunks in the index.
*/
List<Chunk> chunks();

List<Chunk> listChunksForRange(BytesRange bytesRange);
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,16 @@ void invalidRetention() {
)).isInstanceOf(ConfigException.class)
.hasMessage("Invalid value -2 for configuration retention.ms: Value must be at least -1");
}

@Test
void invalidPrefetchingSize() {
assertThatThrownBy(() -> new ChunkCacheConfig(
new ConfigDef(),
Map.of(
"size", "-1",
"prefetching.bytes", "-1"
)
)).isInstanceOf(ConfigException.class)
.hasMessage("Invalid value -1 for configuration prefetching.bytes: Value must be at least 0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,45 @@ void sizeBasedEviction() throws IOException, StorageBackendException {
verify(chunkManager, times(3)).getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt());
}

@Test
void prefetching() throws Exception {
chunkCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1",
"prefetching.bytes", "20" // both chunks
));
chunkCache.startPrefetching(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 2);
verify(chunkManager, times(2)).getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt());

final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
assertThat(cachedChunk0).hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(chunkManager);

final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);
assertThat(cachedChunk1).hasBinaryContent(CHUNK_1);
verifyNoMoreInteractions(chunkManager);
}

@Test
void prefetchingFirstChunk() throws Exception {
chunkCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1",
"prefetching.bytes", "10" // both chunks
));
chunkCache.startPrefetching(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 1);
verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);

final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
assertThat(cachedChunk0).hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(chunkManager);

final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);
assertThat(cachedChunk1).hasBinaryContent(CHUNK_1);
verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);
}
}

@Nested
Expand Down

0 comments on commit 41f01af

Please sign in to comment.