Skip to content

Commit

Permalink
Some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
AnatolyPopov committed Oct 26, 2023
1 parent 4808dcd commit 28cac10
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManagerFactory;
import io.aiven.kafka.tieredstorage.chunkmanager.cache.ChunkCache;
import io.aiven.kafka.tieredstorage.config.RemoteStorageManagerConfig;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
Expand Down Expand Up @@ -453,9 +452,6 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme

final var suffix = ObjectKeyFactory.Suffix.LOG;
final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);
if (chunkManager instanceof ChunkCache) {
((ChunkCache<?>) chunkManager).startPrefetching(segmentKey, segmentManifest, range.from);
}
return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
.toInputStream();
} catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

Expand Down Expand Up @@ -72,6 +71,7 @@ protected ChunkCache(final ChunkManager chunkManager) {
public InputStream getChunk(final ObjectKey objectKey,
final SegmentManifest manifest,
final int chunkId) throws StorageBackendException, IOException {
startPrefetching(objectKey, manifest, chunkId);
final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId);
final AtomicReference<InputStream> result = new AtomicReference<>();
try {
Expand Down Expand Up @@ -144,17 +144,15 @@ protected AsyncCache<ChunkKey, T> buildCache(final ChunkCacheConfig config) {
return cache;
}

public void startPrefetching(final ObjectKey segmentKey,
final SegmentManifest segmentManifest,
final int startPosition) {
final BytesRange prefetchingRange;
if (Integer.MAX_VALUE - startPosition < prefetchingSize) {
prefetchingRange = BytesRange.of(startPosition, Integer.MAX_VALUE);
} else {
prefetchingRange = BytesRange.of(startPosition, startPosition + prefetchingSize);
}
final var chunks = segmentManifest.chunkIndex().listChunksForRange(prefetchingRange);
chunks.forEach(chunk -> {
private void startPrefetching(final ObjectKey segmentKey,
final SegmentManifest segmentManifest,
final int startChunkId) {
final var chunks = segmentManifest.chunkIndex().chunks();
final var chunksToFetch = chunks.subList(
Math.min(startChunkId + 1, chunks.size()),
Math.min(startChunkId + 1 + prefetchingSize, chunks.size())
);
chunksToFetch.forEach(chunk -> {
final ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id);
cache.asMap()
.computeIfAbsent(chunkKey, key -> CompletableFuture.supplyAsync(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public class ChunkCacheConfig extends AbstractConfig {
+ "where \"-1\" represents infinite retention";
private static final long DEFAULT_CACHE_RETENTION_MS = 600_000;

private static final String CACHE_PREFETCHING_SIZE_CONFIG = "prefetching.bytes";
private static final String CACHE_PREFETCHING_SIZE_CONFIG = "prefetching.size";
private static final String CACHE_PREFETCHING_SIZE_DOC =
"The amount of data that should be eagerly prefetched and cached";
"The number of chunks to be eagerly prefetched and cached";

private static final int CACHE_PREFETCHING_SIZE_DEFAULT = 0; //TODO find out what it should be

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ void invalidPrefetchingSize() {
new ConfigDef(),
Map.of(
"size", "-1",
"prefetching.bytes", "-1"
"prefetching.size", "-1"
)
)).isInstanceOf(ConfigException.class)
.hasMessage("Invalid value -1 for configuration prefetching.bytes: Value must be at least 0");
.hasMessage("Invalid value -1 for configuration prefetching.size: Value must be at least 0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@

import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -48,6 +51,7 @@
*/
@ExtendWith(MockitoExtension.class)
class ChunkCacheMetricsTest {
private static final ChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(10, 10, 10, 10);
static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer();

public static final ObjectKey OBJECT_KEY_PATH = () -> "topic/segment";
Expand Down Expand Up @@ -79,6 +83,11 @@ private static Stream<Arguments> caches() {
));
}

@BeforeEach
void setUp() {
when(segmentManifest.chunkIndex()).thenReturn(FIXED_SIZE_CHUNK_INDEX);
}

@ParameterizedTest(name = "Cache {0}")
@MethodSource("caches")
void shouldRecordMetrics(final Class<ChunkCache<?>> chunkCacheClass, final Map<String, ?> config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.description;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mockingDetails;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand All @@ -67,7 +69,8 @@ class ChunkCacheTest {

private static final byte[] CHUNK_0 = "0123456789".getBytes();
private static final byte[] CHUNK_1 = "1011121314".getBytes();
private static final FixedSizeChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(10, 20, 10, 10);
private static final byte[] CHUNK_2 = "1011121314".getBytes();
private static final FixedSizeChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(10, 30, 10, 10);
private static final SegmentIndexesV1 SEGMENT_INDEXES = SegmentIndexesV1.builder()
.add(IndexType.OFFSET, 1)
.add(IndexType.TIMESTAMP, 1)
Expand Down Expand Up @@ -108,6 +111,8 @@ void setUp() throws Exception {
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0));
when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1))
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1));
when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2))
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_2));
}

@Test
Expand Down Expand Up @@ -204,43 +209,61 @@ void sizeBasedEviction() throws IOException, StorageBackendException {
}

@Test
void prefetching() throws Exception {
void prefetchingNextChunk() throws Exception {
chunkCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1",
"prefetching.bytes", "20" // both chunks
"prefetching.size", "1"
));
chunkCache.startPrefetching(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 2);
verify(chunkManager, times(2)).getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt());
verify(chunkManager, description("first chunk was fetched from remote"))
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
verify(chunkManager, description("second chunk was prefetched"))
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);
verify(chunkManager, never().description("third chunk was not prefetched "))
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2);

final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
assertThat(cachedChunk0).hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(chunkManager);

// checking that third chunk is prefetch when fetching chunk 1 from cache
final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);
assertThat(cachedChunk1).hasBinaryContent(CHUNK_1);
await("waiting for prefetching to finish").pollInterval(Duration.ofMillis(5))
.until(() -> chunkCache.statsCounter.snapshot().loadCount() == 5);
verify(chunkManager, description("third chunk was prefetched"))
.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2);
verifyNoMoreInteractions(chunkManager);
}

@Test
void prefetchingFirstChunk() throws Exception {
void prefetchingWholeSegment() throws Exception {
chunkCache.configure(Map.of(
"retention.ms", "-1",
"size", "-1",
"prefetching.bytes", "10" // both chunks
"prefetching.size", SEGMENT_MANIFEST.chunkIndex().chunks().size()
));
chunkCache.startPrefetching(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 1);
verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 3);
// verifying fetching for all 3 chunks(2 prefetched)
verify(chunkManager, times(3)).getChunk(any(), any(), anyInt());

// no fetching from remote since chunk 0 is cached
final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0);
assertThat(cachedChunk0).hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(chunkManager);

// no fetching from remote since chunk 1 is cached
final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);
assertThat(cachedChunk1).hasBinaryContent(CHUNK_1);
verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1);
verifyNoMoreInteractions(chunkManager);

// no fetching from remote since chunk 2 is cached
final InputStream cachedChunk2 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2);
assertThat(cachedChunk2).hasBinaryContent(CHUNK_2);
verifyNoMoreInteractions(chunkManager);
}
}

Expand Down

0 comments on commit 28cac10

Please sign in to comment.