Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefetching by the number of bytes #429

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

Expand Down Expand Up @@ -71,7 +72,8 @@ protected ChunkCache(final ChunkManager chunkManager) {
public InputStream getChunk(final ObjectKey objectKey,
final SegmentManifest manifest,
final int chunkId) throws StorageBackendException, IOException {
startPrefetching(objectKey, manifest, chunkId);
final var currentChunk = manifest.chunkIndex().chunks().get(chunkId);
startPrefetching(objectKey, manifest, currentChunk.originalPosition + currentChunk.originalSize);
final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId);
final AtomicReference<InputStream> result = new AtomicReference<>();
try {
Expand Down Expand Up @@ -145,25 +147,29 @@ protected AsyncCache<ChunkKey, T> buildCache(final ChunkCacheConfig config) {
}

private void startPrefetching(final ObjectKey segmentKey,
final SegmentManifest segmentManifest,
final int startChunkId) {
final var chunks = segmentManifest.chunkIndex().chunks();
final var chunksToFetch = chunks.subList(
Math.min(startChunkId + 1, chunks.size()),
Math.min(startChunkId + 1 + prefetchingSize, chunks.size())
);
chunksToFetch.forEach(chunk -> {
final ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id);
cache.asMap()
.computeIfAbsent(chunkKey, key -> CompletableFuture.supplyAsync(() -> {
try {
final InputStream chunkStream =
chunkManager.getChunk(segmentKey, segmentManifest, chunk.id);
return this.cacheChunk(chunkKey, chunkStream);
} catch (final StorageBackendException | IOException e) {
throw new CompletionException(e);
}
}, executor));
});
final SegmentManifest segmentManifest,
final int startPosition) {
if (prefetchingSize > 0) {
final BytesRange prefetchingRange;
if (Integer.MAX_VALUE - startPosition < prefetchingSize) {
prefetchingRange = BytesRange.of(startPosition, Integer.MAX_VALUE);
} else {
prefetchingRange = BytesRange.ofFromPositionAndSize(startPosition, prefetchingSize);
}
final var chunks = segmentManifest.chunkIndex().chunksForRange(prefetchingRange);
chunks.forEach(chunk -> {
final ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id);
cache.asMap()
.computeIfAbsent(chunkKey, key -> CompletableFuture.supplyAsync(() -> {
try {
final InputStream chunkStream =
chunkManager.getChunk(segmentKey, segmentManifest, chunk.id);
return this.cacheChunk(chunkKey, chunkStream);
} catch (final StorageBackendException | IOException e) {
throw new CompletionException(e);
}
}, executor));
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public class ChunkCacheConfig extends AbstractConfig {
+ "where \"-1\" represents infinite retention";
private static final long DEFAULT_CACHE_RETENTION_MS = 600_000;

private static final String CACHE_PREFETCHING_SIZE_CONFIG = "prefetching.size";
private static final String CACHE_PREFETCHING_SIZE_DOC =
"The number of chunks to be eagerly prefetched and cached";
private static final String CACHE_PREFETCH_MAX_SIZE_CONFIG = "prefetch.max.size";
private static final String CACHE_PREFETCH_MAX_SIZE_DOC =
"The amount of data that should be eagerly prefetched and cached";

private static final int CACHE_PREFETCHING_SIZE_DEFAULT = 0; //TODO find out what it should be

Expand All @@ -57,12 +57,12 @@ private static ConfigDef addCacheConfigs(final ConfigDef configDef) {
CACHE_RETENTION_DOC
);
configDef.define(
CACHE_PREFETCHING_SIZE_CONFIG,
CACHE_PREFETCH_MAX_SIZE_CONFIG,
ConfigDef.Type.INT,
CACHE_PREFETCHING_SIZE_DEFAULT,
ConfigDef.Range.between(0, Integer.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_PREFETCHING_SIZE_DOC
CACHE_PREFETCH_MAX_SIZE_DOC
);
return configDef;
}
Expand All @@ -88,6 +88,6 @@ public Optional<Duration> cacheRetention() {
}

public int cachePrefetchingSize() {
return getInt(CACHE_PREFETCHING_SIZE_CONFIG);
return getInt(CACHE_PREFETCH_MAX_SIZE_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.storage.BytesRange;

import com.fasterxml.jackson.annotation.JsonProperty;

Expand Down Expand Up @@ -108,6 +109,17 @@ public Chunk findChunkForOriginalOffset(final int offset) {
);
}

@Override
public List<Chunk> chunksForRange(final BytesRange bytesRange) {
Chunk current;
final var result = new ArrayList<Chunk>();
for (int i = bytesRange.from; i <= bytesRange.to && i < originalFileSize; i += current.originalSize) {
current = findChunkForOriginalOffset(i);
result.add(current);
}
return result;
}

private int originalChunkSize(final int chunkI) {
final boolean isFinalChunk = chunkI == chunkCount - 1;
return isFinalChunk ? (originalFileSize - (chunkCount - 1) * originalChunkSize) : originalChunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.storage.BytesRange;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
Expand Down Expand Up @@ -49,4 +50,6 @@ public interface ChunkIndex {
* Returns all chunks in the index.
*/
List<Chunk> chunks();

List<Chunk> chunksForRange(BytesRange bytesRange);
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ void invalidPrefetchingSize() {
new ConfigDef(),
Map.of(
"size", "-1",
"prefetching.size", "-1"
"prefetch.max.size", "-1"
)
)).isInstanceOf(ConfigException.class)
.hasMessage("Invalid value -1 for configuration prefetching.size: Value must be at least 0");
.hasMessage("Invalid value -1 for configuration prefetch.max.size: Value must be at least 0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ class ChunkCacheTest {
private static final byte[] CHUNK_0 = "0123456789".getBytes();
private static final byte[] CHUNK_1 = "1011121314".getBytes();
private static final byte[] CHUNK_2 = "1011121314".getBytes();
private static final FixedSizeChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(10, 30, 10, 10);
private static final int ORIGINAL_CHUNK_SIZE = 10;
private static final int ORIGINAL_FILE_SIZE = 30;
private static final FixedSizeChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(
ORIGINAL_CHUNK_SIZE,
ORIGINAL_FILE_SIZE,
10,
10
);
private static final SegmentIndexesV1 SEGMENT_INDEXES = SegmentIndexesV1.builder()
.add(IndexType.OFFSET, 1)
.add(IndexType.TIMESTAMP, 1)
Expand Down Expand Up @@ -213,7 +220,7 @@ void prefetchingNextChunk() throws Exception {
chunkCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1",
"prefetching.size", "1"
"prefetch.max.size", ORIGINAL_CHUNK_SIZE
));
chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 2);
Expand Down Expand Up @@ -243,7 +250,7 @@ void prefetchingWholeSegment() throws Exception {
chunkCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1",
"prefetching.size", SEGMENT_MANIFEST.chunkIndex().chunks().size()
"prefetch.max.size", ORIGINAL_FILE_SIZE - ORIGINAL_CHUNK_SIZE
));
chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 3);
Expand Down