diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java index 45cdc93dc..35c5ce302 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java @@ -33,6 +33,7 @@ import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter; +import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -71,7 +72,8 @@ protected ChunkCache(final ChunkManager chunkManager) { public InputStream getChunk(final ObjectKey objectKey, final SegmentManifest manifest, final int chunkId) throws StorageBackendException, IOException { - startPrefetching(objectKey, manifest, chunkId); + final var currentChunk = manifest.chunkIndex().chunks().get(chunkId); + startPrefetching(objectKey, manifest, currentChunk.originalPosition + currentChunk.originalSize); final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId); final AtomicReference result = new AtomicReference<>(); try { @@ -145,25 +147,29 @@ protected AsyncCache buildCache(final ChunkCacheConfig config) { } private void startPrefetching(final ObjectKey segmentKey, - final SegmentManifest segmentManifest, - final int startChunkId) { - final var chunks = segmentManifest.chunkIndex().chunks(); - final var chunksToFetch = chunks.subList( - Math.min(startChunkId + 1, chunks.size()), - Math.min(startChunkId + 1 + prefetchingSize, chunks.size()) - ); - chunksToFetch.forEach(chunk -> { - final ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id); - cache.asMap() - .computeIfAbsent(chunkKey, key -> CompletableFuture.supplyAsync(() -> { - try { - final InputStream chunkStream = - chunkManager.getChunk(segmentKey, segmentManifest, chunk.id); - return this.cacheChunk(chunkKey, chunkStream); - } catch (final StorageBackendException | IOException e) { - throw new CompletionException(e); - } - }, executor)); - }); + final SegmentManifest segmentManifest, + final int startPosition) { + if (prefetchingSize > 0) { + final BytesRange prefetchingRange; + if (Integer.MAX_VALUE - startPosition < prefetchingSize) { + prefetchingRange = BytesRange.of(startPosition, Integer.MAX_VALUE); + } else { + prefetchingRange = BytesRange.ofFromPositionAndSize(startPosition, prefetchingSize); + } + final var chunks = segmentManifest.chunkIndex().chunksForRange(prefetchingRange); + chunks.forEach(chunk -> { + final ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id); + cache.asMap() + .computeIfAbsent(chunkKey, key -> CompletableFuture.supplyAsync(() -> { + try { + final InputStream chunkStream = + chunkManager.getChunk(segmentKey, segmentManifest, chunk.id); + return this.cacheChunk(chunkKey, chunkStream); + } catch (final StorageBackendException | IOException e) { + throw new CompletionException(e); + } + }, executor)); + }); + } } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfig.java index 67e230396..1b5e097f2 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfig.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfig.java @@ -33,9 +33,9 @@ public class ChunkCacheConfig extends AbstractConfig { + "where \"-1\" represents infinite retention"; private static final long DEFAULT_CACHE_RETENTION_MS = 600_000; - private static final String CACHE_PREFETCHING_SIZE_CONFIG = "prefetching.size"; - private static final String CACHE_PREFETCHING_SIZE_DOC = - "The number of chunks to be eagerly prefetched and cached"; + private static final String CACHE_PREFETCH_MAX_SIZE_CONFIG = "prefetch.max.size"; + private static final String CACHE_PREFETCH_MAX_SIZE_DOC = + "The amount of data that should be eagerly prefetched and cached"; private static final int CACHE_PREFETCHING_SIZE_DEFAULT = 0; //TODO find out what it should be @@ -57,12 +57,12 @@ private static ConfigDef addCacheConfigs(final ConfigDef configDef) { CACHE_RETENTION_DOC ); configDef.define( - CACHE_PREFETCHING_SIZE_CONFIG, + CACHE_PREFETCH_MAX_SIZE_CONFIG, ConfigDef.Type.INT, CACHE_PREFETCHING_SIZE_DEFAULT, ConfigDef.Range.between(0, Integer.MAX_VALUE), ConfigDef.Importance.MEDIUM, - CACHE_PREFETCHING_SIZE_DOC + CACHE_PREFETCH_MAX_SIZE_DOC ); return configDef; } @@ -88,6 +88,6 @@ public Optional cacheRetention() { } public int cachePrefetchingSize() { - return getInt(CACHE_PREFETCHING_SIZE_CONFIG); + return getInt(CACHE_PREFETCH_MAX_SIZE_CONFIG); } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/index/AbstractChunkIndex.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/index/AbstractChunkIndex.java index d4d4dca2f..6bffd5e59 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/index/AbstractChunkIndex.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/index/AbstractChunkIndex.java @@ -20,6 +20,7 @@ import java.util.List; import io.aiven.kafka.tieredstorage.Chunk; +import io.aiven.kafka.tieredstorage.storage.BytesRange; import com.fasterxml.jackson.annotation.JsonProperty; @@ -108,6 +109,17 @@ public Chunk findChunkForOriginalOffset(final int offset) { ); } + @Override + public List chunksForRange(final BytesRange bytesRange) { + Chunk current; + final var result = new ArrayList(); + for (int i = bytesRange.from; i <= bytesRange.to && i < originalFileSize; i += current.originalSize) { + current = findChunkForOriginalOffset(i); + result.add(current); + } + return result; + } + private int originalChunkSize(final int chunkI) { final boolean isFinalChunk = chunkI == chunkCount - 1; return isFinalChunk ? (originalFileSize - (chunkCount - 1) * originalChunkSize) : originalChunkSize; diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/index/ChunkIndex.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/index/ChunkIndex.java index 84e7efe8b..4273a9a25 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/index/ChunkIndex.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/index/ChunkIndex.java @@ -19,6 +19,7 @@ import java.util.List; import io.aiven.kafka.tieredstorage.Chunk; +import io.aiven.kafka.tieredstorage.storage.BytesRange; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; @@ -49,4 +50,6 @@ public interface ChunkIndex { * Returns all chunks in the index. */ List chunks(); + + List chunksForRange(BytesRange bytesRange); } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfigTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfigTest.java index ea9afab6d..d87bfb4be 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfigTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfigTest.java @@ -118,9 +118,9 @@ void invalidPrefetchingSize() { new ConfigDef(), Map.of( "size", "-1", - "prefetching.size", "-1" + "prefetch.max.size", "-1" ) )).isInstanceOf(ConfigException.class) - .hasMessage("Invalid value -1 for configuration prefetching.size: Value must be at least 0"); + .hasMessage("Invalid value -1 for configuration prefetch.max.size: Value must be at least 0"); } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java index 1c0d54833..981aed359 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java @@ -70,7 +70,14 @@ class ChunkCacheTest { private static final byte[] CHUNK_0 = "0123456789".getBytes(); private static final byte[] CHUNK_1 = "1011121314".getBytes(); private static final byte[] CHUNK_2 = "1011121314".getBytes(); - private static final FixedSizeChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(10, 30, 10, 10); + private static final int ORIGINAL_CHUNK_SIZE = 10; + private static final int ORIGINAL_FILE_SIZE = 30; + private static final FixedSizeChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex( + ORIGINAL_CHUNK_SIZE, + ORIGINAL_FILE_SIZE, + 10, + 10 + ); private static final SegmentIndexesV1 SEGMENT_INDEXES = SegmentIndexesV1.builder() .add(IndexType.OFFSET, 1) .add(IndexType.TIMESTAMP, 1) @@ -213,7 +220,7 @@ void prefetchingNextChunk() throws Exception { chunkCache.configure(Map.of( "retention.ms", "-1", "size", "-1", - "prefetching.size", "1" + "prefetch.max.size", ORIGINAL_CHUNK_SIZE )); chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 2); @@ -243,7 +250,7 @@ void prefetchingWholeSegment() throws Exception { chunkCache.configure(Map.of( "retention.ms", "-1", "size", "-1", - "prefetching.size", SEGMENT_MANIFEST.chunkIndex().chunks().size() + "prefetch.max.size", ORIGINAL_FILE_SIZE - ORIGINAL_CHUNK_SIZE )); chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 3);