From ad355e6115e56673946aedd2245c46a86daecf65 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 4 Oct 2023 13:40:20 +0300 Subject: [PATCH] fixup! refactor: rename chunk manager into fetch manager --- .../fetch/DefaultFetchManager.java | 6 +- .../tieredstorage/fetch/FetchEnumeration.java | 2 +- .../tieredstorage/fetch/FetchManager.java | 6 +- .../tieredstorage/fetch/cache/FetchCache.java | 8 +-- .../fetch/DefaultFetchManagerTest.java | 6 +- .../fetch/FetchEnumerationTest.java | 8 +-- .../fetch/cache/FetchCacheMetricsTest.java | 6 +- .../fetch/cache/FetchCacheTest.java | 68 ++++++++++--------- 8 files changed, 56 insertions(+), 54 deletions(-) diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/DefaultFetchManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/DefaultFetchManager.java index eee31b62b..db4331b33 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/DefaultFetchManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/DefaultFetchManager.java @@ -46,9 +46,9 @@ public DefaultFetchManager(final ObjectFetcher fetcher, final AesEncryptionProvi * @return an {@link InputStream} of the fetch part, plain text (i.e., decrypted and decompressed). */ @Override - public InputStream partContent(final ObjectKey objectKey, - final SegmentManifest manifest, - final FetchPart part) throws StorageBackendException { + public InputStream fetchPartContent(final ObjectKey objectKey, + final SegmentManifest manifest, + final FetchPart part) throws StorageBackendException { final InputStream partContent = fetcher.fetch(objectKey, part.range); DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(partContent, part.chunks); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java index 5f084daf4..7cb3cc1ee 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumeration.java @@ -160,7 +160,7 @@ public InputStream nextElement() { private InputStream partChunks(final FetchPart part) { try { - return fetchManager.partContent(objectKey, manifest, part); + return fetchManager.fetchPartContent(objectKey, manifest, part); } catch (final KeyNotFoundException e) { throw new KeyNotFoundRuntimeException(e); } catch (final StorageBackendException | IOException e) { diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchManager.java index bc95cabf6..7bdd7a33f 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/FetchManager.java @@ -24,8 +24,8 @@ import io.aiven.kafka.tieredstorage.storage.StorageBackendException; public interface FetchManager { - InputStream partContent(final ObjectKey objectKey, - final SegmentManifest manifest, - final FetchPart part) + InputStream fetchPartContent(final ObjectKey objectKey, + final SegmentManifest manifest, + final FetchPart part) throws StorageBackendException, IOException; } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java index ce97fb46b..de3b66eb6 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCache.java @@ -68,9 +68,9 @@ protected FetchCache(final FetchManager fetchManager) { * the InputStream will still contain the data. */ @Override - public InputStream partContent(final ObjectKey objectKey, - final SegmentManifest manifest, - final FetchPart part) throws StorageBackendException, IOException { + public InputStream fetchPartContent(final ObjectKey objectKey, + final SegmentManifest manifest, + final FetchPart part) throws StorageBackendException, IOException { final FetchPartKey fetchPartKey = new FetchPartKey(objectKey.value(), part.range); final AtomicReference result = new AtomicReference<>(); try { @@ -79,7 +79,7 @@ public InputStream partContent(final ObjectKey objectKey, if (val == null) { statsCounter.recordMiss(); try { - final InputStream partContent = fetchManager.partContent(objectKey, manifest, part); + final InputStream partContent = fetchManager.fetchPartContent(objectKey, manifest, part); final T t = this.cachePartContent(fetchPartKey, partContent); result.getAndSet(readCachedPartContent(t)); return t; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/DefaultFetchManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/DefaultFetchManagerTest.java index 7c38a00a2..8546da136 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/DefaultFetchManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/DefaultFetchManagerTest.java @@ -68,7 +68,7 @@ void testGetPartContent() throws Exception { .thenReturn(new ByteArrayInputStream("0123456789".getBytes())); final var part = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); - assertThat(fetchManager.partContent(OBJECT_KEY, manifest, part)).hasContent("0123456789"); + assertThat(fetchManager.fetchPartContent(OBJECT_KEY, manifest, part)).hasContent("0123456789"); verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); } @@ -94,7 +94,7 @@ void testGetPartContentWithEncryption() throws Exception { final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, encryption); final FetchManager fetchManager = new DefaultFetchManager(storage, aesEncryptionProvider); - assertThat(fetchManager.partContent(OBJECT_KEY, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT); + assertThat(fetchManager.fetchPartContent(OBJECT_KEY, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT); verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); } @@ -116,7 +116,7 @@ void testGetPartContentWithCompression() throws Exception { final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, true, null); final FetchManager fetchManager = new DefaultFetchManager(storage, null); - assertThat(fetchManager.partContent(OBJECT_KEY, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT); + assertThat(fetchManager.fetchPartContent(OBJECT_KEY, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT); verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java index 540274e45..69899518c 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchEnumerationTest.java @@ -114,7 +114,7 @@ void shouldReturnRangeFromSingleChunk() throws StorageBackendException { final int to = 34; final FetchEnumeration fetchEnumeration = new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1); - when(chunkManager.partContent(eq(SEGMENT_KEY), eq(manifest), any())) + when(chunkManager.fetchPartContent(eq(SEGMENT_KEY), eq(manifest), any())) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then assertThat(fetchEnumeration.firstChunk.id).isEqualTo(fetchEnumeration.lastChunk.id); @@ -134,13 +134,13 @@ void shouldReturnRangeFromMultipleParts() throws StorageBackendException { new FetchEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to), 1); final var part0 = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); final var part1 = part0.next().get(); - when(chunkManager.partContent(SEGMENT_KEY, manifest, part1)) + when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part1)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); final var part2 = part1.next().get(); - when(chunkManager.partContent(SEGMENT_KEY, manifest, part2)) + when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part2)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); final var part3 = part2.next().get(); - when(chunkManager.partContent(SEGMENT_KEY, manifest, part3)) + when(chunkManager.fetchPartContent(SEGMENT_KEY, manifest, part3)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then assertThat(fetchEnumeration.firstChunk.id).isNotEqualTo(fetchEnumeration.lastChunk.id); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheMetricsTest.java index e499311dc..e6a1b6627 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/FetchCacheMetricsTest.java @@ -90,7 +90,7 @@ private static Stream caches() { void shouldRecordMetrics(final Class> fetchCacheClass, final Map config) throws Exception { // Given a fetch cache implementation - when(fetchManager.partContent(any(), any(), any())) + when(fetchManager.fetchPartContent(any(), any(), any())) .thenReturn(new ByteArrayInputStream("test".getBytes())); final var chunk = new Chunk(0, 0, 10, 0, 10); when(chunkIndex.chunks()).thenReturn(List.of(chunk)); @@ -102,13 +102,13 @@ void shouldRecordMetrics(final Class> fetchCacheClass, final Map removalListener).when(fetchCache).removalListener(); final var chunkIndex = SEGMENT_MANIFEST.chunkIndex(); firstPart = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); - when(fetchManager.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) + when(fetchManager.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0)); nextPart = firstPart.next().get(); - when(fetchManager.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) + when(fetchManager.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1)); } @@ -123,17 +123,19 @@ void noEviction() throws IOException, StorageBackendException { "size", "-1" )); - final InputStream part0 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + final InputStream part0 = fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); assertThat(part0).hasBinaryContent(CHUNK_0); - verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); - final InputStream cachedPart0 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + final InputStream cachedPart0 = + fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); assertThat(cachedPart0).hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(fetchManager); - final InputStream part1 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); + final InputStream part1 = fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); assertThat(part1).hasBinaryContent(CHUNK_1); - verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); - final InputStream cachedPart1 = fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); + verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); + final InputStream cachedPart1 = + fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); assertThat(cachedPart1).hasBinaryContent(CHUNK_1); verifyNoMoreInteractions(fetchManager); @@ -147,19 +149,19 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte "size", "-1" )); - assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) + assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) .hasBinaryContent(CHUNK_0); - verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); - assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) + verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) .hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(fetchManager); Thread.sleep(100); - assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) + assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) .hasBinaryContent(CHUNK_1); - verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); - assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) + verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); + assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) .hasBinaryContent(CHUNK_1); verifyNoMoreInteractions(fetchManager); @@ -172,9 +174,9 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte any(), eq(RemovalCause.EXPIRED)); - assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) + assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) .hasBinaryContent(CHUNK_0); - verify(fetchManager, times(2)).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + verify(fetchManager, times(2)).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); } @Test @@ -184,16 +186,16 @@ void sizeBasedEviction() throws IOException, StorageBackendException { "size", "18" )); - assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) + assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) .hasBinaryContent(CHUNK_0); - verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); - assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) + verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) .hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(fetchManager); - assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) + assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) .hasBinaryContent(CHUNK_1); - verify(fetchManager).partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); + verify(fetchManager).fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart); await().atMost(Duration.ofMillis(5000)) .pollDelay(Duration.ofSeconds(2)) @@ -202,11 +204,11 @@ void sizeBasedEviction() throws IOException, StorageBackendException { verify(removalListener).onRemoval(any(FetchPartKey.class), any(), eq(RemovalCause.SIZE)); - assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) + assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) .hasBinaryContent(CHUNK_0); - assertThat(fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) + assertThat(fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) .hasBinaryContent(CHUNK_1); - verify(fetchManager, times(3)).partContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any()); + verify(fetchManager, times(3)).fetchPartContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any()); } } @@ -231,29 +233,29 @@ void setUp() { @Test void failedFetching() throws Exception { - when(fetchManager.partContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any())) + when(fetchManager.fetchPartContent(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), any())) .thenThrow(new StorageBackendException(TEST_EXCEPTION_MESSAGE)) .thenThrow(new IOException(TEST_EXCEPTION_MESSAGE)); - assertThatThrownBy(() -> fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) + assertThatThrownBy(() -> fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) .isInstanceOf(StorageBackendException.class) .hasMessage(TEST_EXCEPTION_MESSAGE); - assertThatThrownBy(() -> fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) + assertThatThrownBy(() -> fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, nextPart)) .isInstanceOf(IOException.class) .hasMessage(TEST_EXCEPTION_MESSAGE); } @Test void failedReadingCachedValueWithInterruptedException() throws Exception { - when(fetchManager.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) + when(fetchManager.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) .thenReturn(new ByteArrayInputStream(CHUNK_0)); doCallRealMethod().doAnswer(invocation -> { throw new InterruptedException(TEST_EXCEPTION_MESSAGE); }).when(fetchCache).readCachedPartContent(any()); - fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); - assertThatThrownBy(() -> fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) + fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + assertThatThrownBy(() -> fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) .isInstanceOf(RuntimeException.class) .hasCauseInstanceOf(ExecutionException.class) .hasRootCauseInstanceOf(InterruptedException.class) @@ -262,14 +264,14 @@ void failedReadingCachedValueWithInterruptedException() throws Exception { @Test void failedReadingCachedValueWithExecutionException() throws Exception { - when(fetchManager.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)).thenReturn( + when(fetchManager.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)).thenReturn( new ByteArrayInputStream(CHUNK_0)); doCallRealMethod().doAnswer(invocation -> { throw new ExecutionException(new RuntimeException(TEST_EXCEPTION_MESSAGE)); }).when(fetchCache).readCachedPartContent(any()); - fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); - assertThatThrownBy(() -> fetchCache.partContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) + fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart); + assertThatThrownBy(() -> fetchCache.fetchPartContent(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, firstPart)) .isInstanceOf(RuntimeException.class) .hasCauseInstanceOf(ExecutionException.class) .hasRootCauseInstanceOf(RuntimeException.class)