diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/Part.java b/core/src/main/java/io/aiven/kafka/tieredstorage/Part.java new file mode 100644 index 000000000..67aec4816 --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/Part.java @@ -0,0 +1,116 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; +import io.aiven.kafka.tieredstorage.storage.BytesRange; + +public class Part { + public final int firstChunkId; + public final BytesRange range; + public final List chunks; + + private final ChunkIndex chunkIndex; + private final int partSize; + private final int finalChunkId; + + public Part(final ChunkIndex chunkIndex, + final Chunk chunk, + final int partSize) { + this.chunkIndex = chunkIndex; + this.partSize = partSize; + this.finalChunkId = chunkIndex.chunks().size() - 1; + + this.firstChunkId = Math.min((chunk.id / partSize) * partSize, finalChunkId); + final var firstChunk = chunkIndex.chunks().get(firstChunkId); + final var lastChunkId = Math.min(firstChunkId + partSize - 1, finalChunkId); + final var lastChunk = chunkIndex.chunks().get(lastChunkId); + this.range = BytesRange.of(firstChunk.range().from, lastChunk.range().to); + this.chunks = chunkIndex.chunks().subList(firstChunkId, lastChunkId + 1); + } + + private Part(final ChunkIndex chunkIndex, + final int partSize, + final int finalChunkId, + final int firstChunkId, + final BytesRange range, + final List chunks) { + this.chunkIndex = chunkIndex; + this.partSize = partSize; + this.finalChunkId = finalChunkId; + + this.firstChunkId = firstChunkId; + this.range = range; + this.chunks = chunks; + } + + public BytesRange range() { + return range; + } + + public Optional next() { + final var currentLastChunkId = firstChunkId + partSize - 1; + if (currentLastChunkId >= finalChunkId) { + return Optional.empty(); + } else { + final var nextFirstChunkId = Math.min(firstChunkId + partSize, finalChunkId); + final var firstChunk = chunkIndex.chunks().get(nextFirstChunkId); + final var nextLastChunkId = Math.min(nextFirstChunkId + partSize - 1, finalChunkId); + final var lastChunk = chunkIndex.chunks().get(nextLastChunkId); + final var range = BytesRange.of(firstChunk.range().from, lastChunk.range().to); + final var chunks = chunkIndex.chunks().subList(nextFirstChunkId, nextLastChunkId + 1); + final var part = new Part(chunkIndex, partSize, finalChunkId, nextFirstChunkId, range, chunks); + return Optional.of(part); + } + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Part part = (Part) o; + return firstChunkId == part.firstChunkId + && Objects.equals(range, part.range) + && Objects.equals(chunks, part.chunks); + } + + @Override + public int hashCode() { + return Objects.hash(firstChunkId, range, chunks); + } + + public int startPosition() { + return chunks.get(0).originalPosition; + } + + @Override + public String toString() { + return "Part{" + + "firstChunkId=" + firstChunkId + + ", range=" + range + + ", chunks=" + chunks.size() + + '}'; + } +} diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 85bea09f9..2321a46b0 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -109,6 +109,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote. private boolean compressionHeuristic; private boolean encryptionEnabled; private int chunkSize; + private int partSize; private RsaEncryptionProvider rsaEncryptionProvider; private AesEncryptionProvider aesEncryptionProvider; private ObjectMapper mapper; @@ -152,6 +153,7 @@ public void configure(final Map configs) { chunkManagerFactory.configure(configs); chunkManager = chunkManagerFactory.initChunkManager(fetcher, aesEncryptionProvider); chunkSize = config.chunkSize(); + partSize = 16 * 1024 * 1024 / chunkSize; // e.g. 16MB/100KB compressionEnabled = config.compressionEnabled(); compressionHeuristic = config.compressionHeuristicEnabled(); @@ -376,7 +378,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme Math.min(endPosition, remoteLogSegmentMetadata.segmentSizeInBytes() - 1) ); - log.trace("Fetching log segment {} with range: {}", remoteLogSegmentMetadata, range); + log.debug("Fetching log segment {} with range: {}", remoteLogSegmentMetadata, range); metrics.recordSegmentFetch( remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), @@ -387,7 +389,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme final var suffix = ObjectKey.Suffix.LOG; final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix); - return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range) + return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range, partSize) .toInputStream(); } catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) { throw new RemoteResourceNotFoundException(e); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java index dde21c86a..148009328 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java @@ -19,12 +19,13 @@ import java.io.IOException; import java.io.InputStream; +import io.aiven.kafka.tieredstorage.Part; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; public interface ChunkManager { - - InputStream getChunk(final String objectKeyPath, - final SegmentManifest manifest, - final int chunkId) throws StorageBackendException, IOException; + InputStream chunksContent(final String objectKeyPath, + final SegmentManifest manifest, + final Part part) + throws StorageBackendException, IOException; } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java index 5310476d0..3cdcc11e6 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java @@ -17,10 +17,9 @@ package io.aiven.kafka.tieredstorage.chunkmanager; import java.io.InputStream; -import java.util.List; import java.util.Optional; -import io.aiven.kafka.tieredstorage.Chunk; +import io.aiven.kafka.tieredstorage.Part; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider; @@ -46,13 +45,13 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi * * @return an {@link InputStream} of the chunk, plain text (i.e., decrypted and decompressed). */ - public InputStream getChunk(final String objectKeyPath, final SegmentManifest manifest, - final int chunkId) throws StorageBackendException { - final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId); + @Override + public InputStream chunksContent(final String objectKeyPath, + final SegmentManifest manifest, + final Part part) throws StorageBackendException { + final InputStream chunkContent = fetcher.fetch(objectKeyPath, part.range); - final InputStream chunkContent = fetcher.fetch(objectKeyPath, chunk.range()); - - DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk)); + DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, part.chunks); final Optional encryptionMetadata = manifest.encryption(); if (encryptionMetadata.isPresent()) { detransformEnum = new DecryptionChunkEnumeration( diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java index bea267398..0dbd8a3a7 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.Configurable; +import io.aiven.kafka.tieredstorage.Part; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; @@ -65,10 +66,11 @@ protected ChunkCache(final ChunkManager chunkManager) { * opened right when fetching from cache happens even if the actual value is removed from the cache, * the InputStream will still contain the data. */ - public InputStream getChunk(final String objectKeyPath, - final SegmentManifest manifest, - final int chunkId) throws StorageBackendException, IOException { - final ChunkKey chunkKey = new ChunkKey(objectKeyPath, chunkId); + @Override + public InputStream chunksContent(final String objectKeyPath, + final SegmentManifest manifest, + final Part part) throws StorageBackendException, IOException { + final ChunkKey chunkKey = new ChunkKey(objectKeyPath, part.firstChunkId); final AtomicReference result = new AtomicReference<>(); try { return cache.asMap() @@ -77,7 +79,7 @@ public InputStream getChunk(final String objectKeyPath, statsCounter.recordMiss(); try { final InputStream chunk = - chunkManager.getChunk(objectKeyPath, manifest, chunkId); + chunkManager.chunksContent(objectKeyPath, manifest, part); final T t = this.cacheChunk(chunkKey, chunk); result.getAndSet(cachedChunkToInputStream(t)); return t; diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java index 9531b0ca0..3770e9610 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java @@ -23,51 +23,62 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import io.aiven.kafka.tieredstorage.Chunk; +import io.aiven.kafka.tieredstorage.Part; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; import io.aiven.kafka.tieredstorage.storage.BytesRange; -import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import org.apache.commons.io.input.BoundedInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FetchChunkEnumeration implements Enumeration { + static final Logger log = LoggerFactory.getLogger(FetchChunkEnumeration.class); + private final ChunkManager chunkManager; private final String objectKeyPath; private final SegmentManifest manifest; private final BytesRange range; - final int startChunkId; - final int lastChunkId; + final Part firstPart; + final Chunk firstChunk; + final Part lastPart; + final Chunk lastChunk; private final ChunkIndex chunkIndex; - int currentChunkId; + Optional currentPart; public boolean closed; + final int partSize; + /** - * - * @param chunkManager provides chunk input to fetch from + * @param chunkManager provides chunk input to fetch from * @param objectKeyPath required by chunkManager - * @param manifest provides to index to build response from - * @param range original offset range start/end position + * @param manifest provides to index to build response from + * @param range original offset range start/end position + * @param partSize fetch part size */ public FetchChunkEnumeration(final ChunkManager chunkManager, final String objectKeyPath, final SegmentManifest manifest, - final BytesRange range) { + final BytesRange range, + final int partSize) { this.chunkManager = Objects.requireNonNull(chunkManager, "chunkManager cannot be null"); this.objectKeyPath = Objects.requireNonNull(objectKeyPath, "objectKeyPath cannot be null"); this.manifest = Objects.requireNonNull(manifest, "manifest cannot be null"); this.range = Objects.requireNonNull(range, "range cannot be null"); + this.partSize = partSize; this.chunkIndex = manifest.chunkIndex(); - final Chunk firstChunk = getFirstChunk(range.from); - startChunkId = firstChunk.id; - currentChunkId = startChunkId; - final Chunk lastChunk = getLastChunk(range.to); - lastChunkId = lastChunk.id; + firstChunk = getFirstChunk(range.from); + firstPart = new Part(chunkIndex, firstChunk, this.partSize); + currentPart = Optional.of(firstPart); + lastChunk = getLastChunk(range.to); + lastPart = new Part(chunkIndex, lastChunk, this.partSize); } private Chunk getFirstChunk(final int fromPosition) { @@ -91,55 +102,63 @@ private Chunk getLastChunk(final int endPosition) { @Override public boolean hasMoreElements() { - return !closed && currentChunkId <= lastChunkId; + return !closed && currentPart.isPresent(); } @Override public InputStream nextElement() { - if (!hasMoreElements()) { + if (currentPart.isEmpty()) { throw new NoSuchElementException(); } - InputStream chunkContent = getChunkContent(currentChunkId); + final InputStream partContent = partContent(currentPart.get()); - final Chunk currentChunk = chunkIndex.chunks().get(currentChunkId); - final int chunkStartPosition = currentChunk.originalPosition; - final boolean isAtFirstChunk = currentChunkId == startChunkId; - final boolean isAtLastChunk = currentChunkId == lastChunkId; - final boolean isSingleChunk = isAtFirstChunk && isAtLastChunk; - if (isSingleChunk) { + final int chunkStartPosition = currentPart.get().startPosition(); + final boolean isAtFirstPart = currentPart.get().equals(firstPart); + final boolean isAtLastPart = currentPart.get().equals(lastPart); + final boolean isSinglePart = isAtFirstPart && isAtLastPart; + if (isSinglePart) { final int toSkip = range.from - chunkStartPosition; try { - chunkContent.skip(toSkip); - final int chunkSize = range.size(); - chunkContent = new BoundedInputStream(chunkContent, chunkSize); + partContent.skip(toSkip); } catch (final IOException e) { throw new RuntimeException(e); } + + final int chunkSize = range.size(); + + log.trace("Returning part {} with size {}", currentPart.get(), chunkSize); + + currentPart = Optional.empty(); + return new BoundedInputStream(partContent, chunkSize); } else { - if (isAtFirstChunk) { + if (isAtFirstPart) { final int toSkip = range.from - chunkStartPosition; try { - chunkContent.skip(toSkip); + partContent.skip(toSkip); } catch (final IOException e) { throw new RuntimeException(e); } } - if (isAtLastChunk) { + if (isAtLastPart) { final int chunkSize = range.to - chunkStartPosition + 1; - chunkContent = new BoundedInputStream(chunkContent, chunkSize); + + log.trace("Returning part {} with size {}", currentPart.get(), chunkSize); + + currentPart = Optional.empty(); + return new BoundedInputStream(partContent, chunkSize); } } - currentChunkId += 1; - return chunkContent; + log.trace("Returning part {} with size {}", currentPart.get(), currentPart.get().range.size()); + + currentPart = currentPart.get().next(); + return partContent; } - private InputStream getChunkContent(final int chunkId) { + private InputStream partContent(final Part part) { try { - return chunkManager.getChunk(objectKeyPath, manifest, chunkId); - } catch (final KeyNotFoundException e) { - throw new KeyNotFoundRuntimeException(e); + return chunkManager.chunksContent(objectKeyPath, manifest, part); } catch (final StorageBackendException | IOException e) { throw new RuntimeException(e); } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java index 6335a8763..fd70f5b12 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java @@ -21,6 +21,7 @@ import java.io.ByteArrayInputStream; import io.aiven.kafka.tieredstorage.AesKeyAwareTest; +import io.aiven.kafka.tieredstorage.Part; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1; @@ -51,12 +52,13 @@ class DefaultChunkManagerTest extends AesKeyAwareTest { void testGetChunk() throws Exception { final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10); - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream("0123456789".getBytes())); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasContent("0123456789"); + final var part = new Part(chunkIndex, chunkIndex.chunks().get(0), 1); + assertThat(chunkManager.chunksContent(OBJECT_KEY_PATH, manifest, part)).hasContent("0123456789"); verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); } @@ -73,14 +75,16 @@ void testGetChunkWithEncryption() throws Exception { final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, encrypted.length, encrypted.length); + final var part = new Part(chunkIndex, chunkIndex.chunks().get(0), 1); + when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())).thenReturn( new ByteArrayInputStream(encrypted)); final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, - new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad), null); + new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad)); final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); + assertThat(chunkManager.chunksContent(OBJECT_KEY_PATH, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT); verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); } @@ -94,13 +98,15 @@ void testGetChunkWithCompression() throws Exception { } final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, compressed.length, compressed.length); + final var part = new Part(chunkIndex, chunkIndex.chunks().get(0), 1); + when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream(compressed)); - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, true, null, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, true, null); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); + assertThat(chunkManager.chunksContent(OBJECT_KEY_PATH, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT); verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java index 76e3ea4d4..5630ac9f2 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.stream.Stream; +import io.aiven.kafka.tieredstorage.Part; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; @@ -39,7 +40,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.InstanceOfAssertFactories.DOUBLE; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.when; /** @@ -58,6 +58,8 @@ class ChunkCacheMetricsTest { ChunkManager chunkManager; @Mock SegmentManifest segmentManifest; + @Mock + Part firstPart; private static Stream caches() { return Stream.of( @@ -83,7 +85,7 @@ private static Stream caches() { void shouldRecordMetrics(final Class> chunkCacheClass, final Map config) throws Exception { // Given a chunk cache implementation - when(chunkManager.getChunk(any(), any(), anyInt())) + when(chunkManager.chunksContent(any(), any(), any())) .thenReturn(new ByteArrayInputStream("test".getBytes())); final var chunkCache = chunkCacheClass.getDeclaredConstructor(ChunkManager.class).newInstance(chunkManager); @@ -92,13 +94,13 @@ void shouldRecordMetrics(final Class> chunkCacheClass, final Map removalListener; + Part firstPart; + Part nextPart; + @BeforeEach void setUp() throws Exception { doAnswer(invocation -> removalListener).when(chunkCache).removalListener(); - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0)); - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1)); + final var chunkIndex = SEGMENT_MANIFEST.chunkIndex(); + firstPart = new Part(chunkIndex, chunkIndex.chunks().get(0), 1); + when(chunkManager.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0)); + nextPart = firstPart.next().get(); + when(chunkManager.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1)); } @Test @@ -103,17 +108,17 @@ void noEviction() throws IOException, StorageBackendException { "size", "-1" )); - final InputStream chunk0 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + final InputStream chunk0 = chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); assertThat(chunk0).hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + verify(chunkManager).chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); + final InputStream cachedChunk0 = chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); assertThat(cachedChunk0).hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); - final InputStream chunk1 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + final InputStream chunk1 = chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart); assertThat(chunk1).hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); - final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + verify(chunkManager).chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart); + final InputStream cachedChunk1 = chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart); assertThat(cachedChunk1).hasBinaryContent(CHUNK_1); verifyNoMoreInteractions(chunkManager); @@ -127,20 +132,20 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte "size", "-1" )); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); + assertThat(chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); + verify(chunkManager).chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); + assertThat(chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); Thread.sleep(100); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); + assertThat(chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .hasBinaryContent(CHUNK_1); + verify(chunkManager).chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart); + assertThat(chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .hasBinaryContent(CHUNK_1); verifyNoMoreInteractions(chunkManager); await().atMost(Duration.ofMillis(5000)).pollInterval(Duration.ofMillis(100)) @@ -152,28 +157,28 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte any(), eq(RemovalCause.EXPIRED)); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - verify(chunkManager, times(2)).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + assertThat(chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); + verify(chunkManager, times(2)).chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); } @Test void sizeBasedEviction() throws IOException, StorageBackendException { chunkCache.configure(Map.of( - "retention.ms", "-1", - "size", "18" + "retention.ms", "-1", + "size", "18" )); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); + assertThat(chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); + verify(chunkManager).chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); + assertThat(chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + assertThat(chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .hasBinaryContent(CHUNK_1); + verify(chunkManager).chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart); await().atMost(Duration.ofMillis(5000)) .pollDelay(Duration.ofSeconds(2)) @@ -182,17 +187,20 @@ void sizeBasedEviction() throws IOException, StorageBackendException { verify(removalListener).onRemoval(any(ChunkKey.class), any(), eq(RemovalCause.SIZE)); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); - verify(chunkManager, times(3)).getChunk(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), anyInt()); + assertThat(chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); + assertThat(chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .hasBinaryContent(CHUNK_1); + verify(chunkManager, times(3)).chunksContent(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), any()); } } @Nested class ErrorHandlingTests { + Part firstPart; + Part nextPart; + private final Map configs = Map.of( "retention.ms", "-1", "size", "-1" @@ -201,57 +209,56 @@ class ErrorHandlingTests { @BeforeEach void setUp() { chunkCache.configure(configs); + + firstPart = new Part(SEGMENT_MANIFEST.chunkIndex(), SEGMENT_MANIFEST.chunkIndex().chunks().get(0), 1); + nextPart = firstPart.next().get(); } @Test void failedFetching() throws Exception { - when(chunkManager.getChunk(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), anyInt())) - .thenThrow(new StorageBackendException(TEST_EXCEPTION_MESSAGE)) - .thenThrow(new IOException(TEST_EXCEPTION_MESSAGE)); - - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .isInstanceOf(StorageBackendException.class) - .hasMessage(TEST_EXCEPTION_MESSAGE); - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .isInstanceOf(IOException.class) - .hasMessage(TEST_EXCEPTION_MESSAGE); + when(chunkManager.chunksContent(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), any())) + .thenThrow(new StorageBackendException(TEST_EXCEPTION_MESSAGE)) + .thenThrow(new IOException(TEST_EXCEPTION_MESSAGE)); + + assertThatThrownBy(() -> chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .isInstanceOf(StorageBackendException.class) + .hasMessage(TEST_EXCEPTION_MESSAGE); + assertThatThrownBy(() -> chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .isInstanceOf(IOException.class) + .hasMessage(TEST_EXCEPTION_MESSAGE); } @Test void failedReadingCachedValueWithInterruptedException() throws Exception { - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .thenReturn(new ByteArrayInputStream(CHUNK_0)); + when(chunkManager.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .thenReturn(new ByteArrayInputStream(CHUNK_0)); doCallRealMethod().doAnswer(invocation -> { throw new InterruptedException(TEST_EXCEPTION_MESSAGE); }).when(chunkCache).cachedChunkToInputStream(any()); - chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .isInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(ExecutionException.class) - .hasRootCauseInstanceOf(InterruptedException.class) - .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); + chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); + assertThatThrownBy(() -> chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(ExecutionException.class) + .hasRootCauseInstanceOf(InterruptedException.class) + .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); } @Test void failedReadingCachedValueWithExecutionException() throws Exception { - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)).thenReturn( - new ByteArrayInputStream(CHUNK_0)); + when(chunkManager.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)).thenReturn( + new ByteArrayInputStream(CHUNK_0)); doCallRealMethod().doAnswer(invocation -> { throw new ExecutionException(new RuntimeException(TEST_EXCEPTION_MESSAGE)); }).when(chunkCache).cachedChunkToInputStream(any()); - chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .isInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(ExecutionException.class) - .hasRootCauseInstanceOf(RuntimeException.class) - .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); + chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); + assertThatThrownBy(() -> chunkCache.chunksContent(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(ExecutionException.class) + .hasRootCauseInstanceOf(RuntimeException.class) + .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); } } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java index 9fb048a53..f055f418b 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java @@ -62,7 +62,7 @@ class FetchChunkEnumerationSourceInputStreamClosingTest { static final FixedSizeChunkIndex CHUNK_INDEX = new FixedSizeChunkIndex( CHUNK_SIZE, CHUNK_SIZE * 3, CHUNK_SIZE, CHUNK_SIZE); static final SegmentManifest SEGMENT_MANIFEST = new SegmentManifestV1( - CHUNK_INDEX, false, null, null); + CHUNK_INDEX, false, null); TestObjectFetcher fetcher; @@ -79,7 +79,7 @@ void test(final Map config, final ChunkManagerFactory chunkManagerFactory = new ChunkManagerFactory(); chunkManagerFactory.configure(config); final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(fetcher, null); - final var is = new FetchChunkEnumeration(chunkManager, OBJECT_KEY_PATH, SEGMENT_MANIFEST, range) + final var is = new FetchChunkEnumeration(chunkManager, OBJECT_KEY_PATH, SEGMENT_MANIFEST, range, 1) .toInputStream(); if (readFully) { is.readAllBytes(); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java index c37b9987a..8f82deeb9 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java @@ -19,6 +19,7 @@ import java.io.ByteArrayInputStream; import java.util.NoSuchElementException; +import io.aiven.kafka.tieredstorage.Part; import io.aiven.kafka.tieredstorage.chunkmanager.DefaultChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1; @@ -33,6 +34,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -41,7 +44,7 @@ class FetchChunkEnumerationTest { DefaultChunkManager chunkManager; final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 100, 10, 100); - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null); static final byte[] CHUNK_CONTENT = "0123456789".getBytes(); static final String SEGMENT_KEY_PATH = "topic/segment"; @@ -52,13 +55,13 @@ class FetchChunkEnumerationTest { @Test void failsWhenLargerStartPosition() { // Given - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null); // When final int from = 1000; final int to = from + 1; // Then assertThatThrownBy( - () -> new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to))) + () -> new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to), 1)) .hasMessage("Invalid start position " + from + " in segment path topic/segment"); } @@ -71,9 +74,9 @@ void endPositionIsWithinIndex() { final int to = 80; // Then final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); - assertThat(fetchChunk.startChunkId).isEqualTo(0); - assertThat(fetchChunk.lastChunkId).isEqualTo(8); + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to), 1); + assertThat(fetchChunk.firstChunk.id).isEqualTo(0); + assertThat(fetchChunk.lastChunk.id).isEqualTo(8); } // - End position outside index @@ -84,10 +87,10 @@ void endPositionIsOutsideIndex() { final int from = 0; final int to = 110; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to), 1); // Then - assertThat(fetchChunk.startChunkId).isEqualTo(0); - assertThat(fetchChunk.lastChunkId).isEqualTo(9); + assertThat(fetchChunk.firstChunk.id).isEqualTo(0); + assertThat(fetchChunk.lastChunk.id).isEqualTo(9); } // - Single chunk @@ -98,11 +101,11 @@ void shouldReturnRangeFromSingleChunk() throws StorageBackendException { final int from = 32; final int to = 34; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, fetchChunk.currentChunkId)) + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to), 1); + when(chunkManager.chunksContent(eq(SEGMENT_KEY_PATH), eq(manifest), any())) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then - assertThat(fetchChunk.startChunkId).isEqualTo(fetchChunk.lastChunkId); + assertThat(fetchChunk.firstChunk.id).isEqualTo(fetchChunk.lastChunk.id); assertThat(fetchChunk.nextElement()).hasContent("234"); assertThat(fetchChunk.hasMoreElements()).isFalse(); assertThatThrownBy(fetchChunk::nextElement).isInstanceOf(NoSuchElementException.class); @@ -116,15 +119,19 @@ void shouldReturnRangeFromMultipleChunks() throws StorageBackendException { final int from = 15; final int to = 34; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 1)) + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to), 1); + final var first = new Part(chunkIndex, chunkIndex.chunks().get(0), 1); + final var part1 = first.next().get(); + when(chunkManager.chunksContent(SEGMENT_KEY_PATH, manifest, part1)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 2)) + final var part2 = part1.next().get(); + when(chunkManager.chunksContent(SEGMENT_KEY_PATH, manifest, part2)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 3)) + final var part3 = part2.next().get(); + when(chunkManager.chunksContent(SEGMENT_KEY_PATH, manifest, part3)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then - assertThat(fetchChunk.startChunkId).isNotEqualTo(fetchChunk.lastChunkId); + assertThat(fetchChunk.firstChunk.id).isNotEqualTo(fetchChunk.lastChunk.id); assertThat(fetchChunk.nextElement()).hasContent("56789"); assertThat(fetchChunk.nextElement()).hasContent("0123456789"); assertThat(fetchChunk.nextElement()).hasContent("01234");