Skip to content

Commit

Permalink
fixup! refactor: rename chunk manager into fetch manager
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Oct 18, 2023
1 parent 801e894 commit ad355e6
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public DefaultFetchManager(final ObjectFetcher fetcher, final AesEncryptionProvi
* @return an {@link InputStream} of the fetch part, plain text (i.e., decrypted and decompressed).
*/
@Override
public InputStream partContent(final ObjectKey objectKey,
final SegmentManifest manifest,
final FetchPart part) throws StorageBackendException {
public InputStream fetchPartContent(final ObjectKey objectKey,
final SegmentManifest manifest,
final FetchPart part) throws StorageBackendException {
final InputStream partContent = fetcher.fetch(objectKey, part.range);

DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(partContent, part.chunks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public InputStream nextElement() {

private InputStream partChunks(final FetchPart part) {
try {
return fetchManager.partContent(objectKey, manifest, part);
return fetchManager.fetchPartContent(objectKey, manifest, part);
} catch (final KeyNotFoundException e) {
throw new KeyNotFoundRuntimeException(e);
} catch (final StorageBackendException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

public interface FetchManager {
InputStream partContent(final ObjectKey objectKey,
final SegmentManifest manifest,
final FetchPart part)
InputStream fetchPartContent(final ObjectKey objectKey,
final SegmentManifest manifest,
final FetchPart part)
throws StorageBackendException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ protected FetchCache(final FetchManager fetchManager) {
* the InputStream will still contain the data.
*/
@Override
public InputStream partContent(final ObjectKey objectKey,
final SegmentManifest manifest,
final FetchPart part) throws StorageBackendException, IOException {
public InputStream fetchPartContent(final ObjectKey objectKey,
final SegmentManifest manifest,
final FetchPart part) throws StorageBackendException, IOException {
final FetchPartKey fetchPartKey = new FetchPartKey(objectKey.value(), part.range);
final AtomicReference<InputStream> result = new AtomicReference<>();
try {
Expand All @@ -79,7 +79,7 @@ public InputStream partContent(final ObjectKey objectKey,
if (val == null) {
statsCounter.recordMiss();
try {
final InputStream partContent = fetchManager.partContent(objectKey, manifest, part);
final InputStream partContent = fetchManager.fetchPartContent(objectKey, manifest, part);
final T t = this.cachePartContent(fetchPartKey, partContent);
result.getAndSet(readCachedPartContent(t));
return t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void testGetPartContent() throws Exception {
.thenReturn(new ByteArrayInputStream("0123456789".getBytes()));

final var part = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1);
assertThat(fetchManager.partContent(OBJECT_KEY, manifest, part)).hasContent("0123456789");
assertThat(fetchManager.fetchPartContent(OBJECT_KEY, manifest, part)).hasContent("0123456789");
verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range());
}

Expand All @@ -94,7 +94,7 @@ void testGetPartContentWithEncryption() throws Exception {
final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, encryption);
final FetchManager fetchManager = new DefaultFetchManager(storage, aesEncryptionProvider);

assertThat(fetchManager.partContent(OBJECT_KEY, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT);
assertThat(fetchManager.fetchPartContent(OBJECT_KEY, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT);
verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range());
}

Expand All @@ -116,7 +116,7 @@ void testGetPartContentWithCompression() throws Exception {
final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, true, null);
final FetchManager fetchManager = new DefaultFetchManager(storage, null);

assertThat(fetchManager.partContent(OBJECT_KEY, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT);
assertThat(fetchManager.fetchPartContent(OBJECT_KEY, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT);
verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void shouldReturnRangeFromSingleChunk() throws StorageBackendException {
final int to = 34;
final FetchEnumeration fetchEnumeration =
new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1);
when(chunkManager.partContent(eq(SEGMENT_KEY), eq(manifest), any()))
when(chunkManager.fetchPartContent(eq(SEGMENT_KEY), eq(manifest), any()))
.thenReturn(new ByteArrayInputStream(CHUNK_CONTENT));
// Then
assertThat(fetchEnumeration.firstChunk.id).isEqualTo(fetchEnumeration.lastChunk.id);
Expand All @@ -134,13 +134,13 @@ void shouldReturnRangeFromMultipleParts() throws StorageBackendException {
new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1);
final var part0 = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1);
final var part1 = part0.next().get();
when(chunkManager.partContent(SEGMENT_KEY, manifest, part1))
when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part1))
.thenReturn(new ByteArrayInputStream(CHUNK_CONTENT));
final var part2 = part1.next().get();
when(chunkManager.partContent(SEGMENT_KEY, manifest, part2))
when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part2))
.thenReturn(new ByteArrayInputStream(CHUNK_CONTENT));
final var part3 = part2.next().get();
when(chunkManager.partContent(SEGMENT_KEY, manifest, part3))
when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part3))
.thenReturn(new ByteArrayInputStream(CHUNK_CONTENT));
// Then
assertThat(fetchEnumeration.firstChunk.id).isNotEqualTo(fetchEnumeration.lastChunk.id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private static Stream<Arguments> caches() {
void shouldRecordMetrics(final Class<FetchCache<?>> fetchCacheClass, final Map<String, ?> config)
throws Exception {
// Given a fetch cache implementation
when(fetchManager.partContent(any(), any(), any()))
when(fetchManager.fetchPartContent(any(), any(), any()))
.thenReturn(new ByteArrayInputStream("test".getBytes()));
final var chunk = new Chunk(0, 0, 10, 0, 10);
when(chunkIndex.chunks()).thenReturn(List.of(chunk));
Expand All @@ -102,13 +102,13 @@ void shouldRecordMetrics(final Class<FetchCache<?>> fetchCacheClass, final Map<S
final var objectName = new ObjectName("aiven.kafka.server.tieredstorage.cache:type=fetch-cache");

// When getting a existing part from cache
fetchCache.partContent(OBJECT_KEY_PATH, segmentManifest, firstPart);
fetchCache.fetchPartContent(OBJECT_KEY_PATH, segmentManifest, firstPart);

// check cache size increases after first miss
assertThat(MBEAN_SERVER.getAttribute(objectName, "cache-size-total"))
.isEqualTo(1.0);

fetchCache.partContent(OBJECT_KEY_PATH, segmentManifest, firstPart);
fetchCache.fetchPartContent(OBJECT_KEY_PATH, segmentManifest, firstPart);

// Then the following metrics should be available
assertThat(MBEAN_SERVER.getAttribute(objectName, "cache-hits-total"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ void setUp() throws Exception {
doAnswer(invocation -> removalListener).when(fetchCache).removalListener();
final var chunkIndex = SEGMENT_MANIFEST.chunkIndex();
firstPart = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1);
when(fetchManager.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
when(fetchManager.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0));
nextPart = firstPart.next().get();
when(fetchManager.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
when(fetchManager.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
.thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1));
}

Expand All @@ -123,17 +123,19 @@ void noEviction() throws IOException, StorageBackendException {
"size", "-1"
));

final InputStream part0 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
final InputStream part0 = fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThat(part0).hasBinaryContent(CHUNK_0);
verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
final InputStream cachedPart0 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
final InputStream cachedPart0 =
fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThat(cachedPart0).hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(fetchManager);

final InputStream part1 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
final InputStream part1 = fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
assertThat(part1).hasBinaryContent(CHUNK_1);
verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
final InputStream cachedPart1 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
final InputStream cachedPart1 =
fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
assertThat(cachedPart1).hasBinaryContent(CHUNK_1);
verifyNoMoreInteractions(fetchManager);

Expand All @@ -147,19 +149,19 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte
"size", "-1"
));

assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
.hasBinaryContent(CHUNK_0);
verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
.hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(fetchManager);

Thread.sleep(100);

assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
.hasBinaryContent(CHUNK_1);
verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
.hasBinaryContent(CHUNK_1);
verifyNoMoreInteractions(fetchManager);

Expand All @@ -172,9 +174,9 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte
any(),
eq(RemovalCause.EXPIRED));

assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
.hasBinaryContent(CHUNK_0);
verify(fetchManager, times(2)).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
verify(fetchManager, times(2)).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
}

@Test
Expand All @@ -184,16 +186,16 @@ void sizeBasedEviction() throws IOException, StorageBackendException {
"size", "18"
));

assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
.hasBinaryContent(CHUNK_0);
verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
.hasBinaryContent(CHUNK_0);
verifyNoMoreInteractions(fetchManager);

assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
.hasBinaryContent(CHUNK_1);
verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);
verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart);

await().atMost(Duration.ofMillis(5000))
.pollDelay(Duration.ofSeconds(2))
Expand All @@ -202,11 +204,11 @@ void sizeBasedEviction() throws IOException, StorageBackendException {

verify(removalListener).onRemoval(any(FetchPartKey.class), any(), eq(RemovalCause.SIZE));

assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
.hasBinaryContent(CHUNK_0);
assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
.hasBinaryContent(CHUNK_1);
verify(fetchManager, times(3)).partContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any());
verify(fetchManager, times(3)).fetchPartContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any());
}

}
Expand All @@ -231,29 +233,29 @@ void setUp() {

@Test
void failedFetching() throws Exception {
when(fetchManager.partContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any()))
when(fetchManager.fetchPartContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any()))
.thenThrow(new StorageBackendException(TEST_EXCEPTION_MESSAGE))
.thenThrow(new IOException(TEST_EXCEPTION_MESSAGE));

assertThatThrownBy(() -> fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
assertThatThrownBy(() -> fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
.isInstanceOf(StorageBackendException.class)
.hasMessage(TEST_EXCEPTION_MESSAGE);
assertThatThrownBy(() -> fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
assertThatThrownBy(() -> fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart))
.isInstanceOf(IOException.class)
.hasMessage(TEST_EXCEPTION_MESSAGE);
}

@Test
void failedReadingCachedValueWithInterruptedException() throws Exception {
when(fetchManager.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
when(fetchManager.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
.thenReturn(new ByteArrayInputStream(CHUNK_0));

doCallRealMethod().doAnswer(invocation -> {
throw new InterruptedException(TEST_EXCEPTION_MESSAGE);
}).when(fetchCache).readCachedPartContent(any());

fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThatThrownBy(() -> fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThatThrownBy(() -> fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(ExecutionException.class)
.hasRootCauseInstanceOf(InterruptedException.class)
Expand All @@ -262,14 +264,14 @@ void failedReadingCachedValueWithInterruptedException() throws Exception {

@Test
void failedReadingCachedValueWithExecutionException() throws Exception {
when(fetchManager.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)).thenReturn(
when(fetchManager.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)).thenReturn(
new ByteArrayInputStream(CHUNK_0));
doCallRealMethod().doAnswer(invocation -> {
throw new ExecutionException(new RuntimeException(TEST_EXCEPTION_MESSAGE));
}).when(fetchCache).readCachedPartContent(any());

fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThatThrownBy(() -> fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart);
assertThatThrownBy(() -> fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(ExecutionException.class)
.hasRootCauseInstanceOf(RuntimeException.class)
Expand Down

0 comments on commit ad355e6

Please sign in to comment.