From 816db129bf4ccc777395a488cd7ff4fe29b63273 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 28 Sep 2023 21:59:44 +0300 Subject: [PATCH] refactor: fetch and catch parts of segment instead of chunks To decouple fetching rate from transformation, a new concept of Part is introduced. The goal is to download larger ranges (parts) and transform smaller chunks. --- .../tieredstorage/RemoteStorageManager.java | 6 +- .../chunkmanager/ChunkManager.java | 9 +- .../chunkmanager/DefaultChunkManager.java | 15 +- .../chunkmanager/cache/ChunkCache.java | 15 +- .../config/RemoteStorageManagerConfig.java | 23 +++ .../transform/FetchChunkEnumeration.java | 88 ++++++---- .../chunkmanager/DefaultChunkManagerTest.java | 18 ++- .../cache/ChunkCacheMetricsTest.java | 10 +- .../chunkmanager/cache/ChunkCacheTest.java | 151 +++++++++--------- .../RemoteStorageManagerConfigTest.java | 43 +++++ ...umerationSourceInputStreamClosingTest.java | 4 +- .../transform/FetchChunkEnumerationTest.java | 41 +++-- 12 files changed, 268 insertions(+), 155 deletions(-) diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 85bea09f9..49fed2d4a 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -109,6 +109,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote. private boolean compressionHeuristic; private boolean encryptionEnabled; private int chunkSize; + private int partSize; private RsaEncryptionProvider rsaEncryptionProvider; private AesEncryptionProvider aesEncryptionProvider; private ObjectMapper mapper; @@ -152,6 +153,7 @@ public void configure(final Map configs) { chunkManagerFactory.configure(configs); chunkManager = chunkManagerFactory.initChunkManager(fetcher, aesEncryptionProvider); chunkSize = config.chunkSize(); + partSize = config.fetchPartSize() / chunkSize; // e.g. 16MB/100KB compressionEnabled = config.compressionEnabled(); compressionHeuristic = config.compressionHeuristicEnabled(); @@ -376,7 +378,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme Math.min(endPosition, remoteLogSegmentMetadata.segmentSizeInBytes() - 1) ); - log.trace("Fetching log segment {} with range: {}", remoteLogSegmentMetadata, range); + log.debug("Fetching log segment {} with range: {}", remoteLogSegmentMetadata, range); metrics.recordSegmentFetch( remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), @@ -387,7 +389,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme final var suffix = ObjectKey.Suffix.LOG; final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix); - return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range) + return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range, partSize) .toInputStream(); } catch (final KeyNotFoundException | KeyNotFoundRuntimeException e) { throw new RemoteResourceNotFoundException(e); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java index dde21c86a..d90ba4c7b 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java @@ -19,12 +19,13 @@ import java.io.IOException; import java.io.InputStream; +import io.aiven.kafka.tieredstorage.FetchPart; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; public interface ChunkManager { - - InputStream getChunk(final String objectKeyPath, - final SegmentManifest manifest, - final int chunkId) throws StorageBackendException, IOException; + InputStream partChunks(final String objectKeyPath, + final SegmentManifest manifest, + final FetchPart part) + throws StorageBackendException, IOException; } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java index 8e5bbc2f0..d0ffdf143 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java @@ -17,10 +17,9 @@ package io.aiven.kafka.tieredstorage.chunkmanager; import java.io.InputStream; -import java.util.List; import java.util.Optional; -import io.aiven.kafka.tieredstorage.Chunk; +import io.aiven.kafka.tieredstorage.FetchPart; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider; @@ -46,13 +45,13 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi * * @return an {@link InputStream} of the chunk, plain text (i.e., decrypted and decompressed). */ - public InputStream getChunk(final String objectKeyPath, final SegmentManifest manifest, - final int chunkId) throws StorageBackendException { - final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId); + @Override + public InputStream partChunks(final String objectKeyPath, + final SegmentManifest manifest, + final FetchPart part) throws StorageBackendException { + final InputStream chunkContent = fetcher.fetch(objectKeyPath, part.range); - final InputStream chunkContent = fetcher.fetch(objectKeyPath, chunk.range()); - - DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk)); + DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, part.chunks); final Optional encryptionMetadata = manifest.encryption(); if (encryptionMetadata.isPresent()) { detransformEnum = new DecryptionChunkEnumeration( diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java index bea267398..c677e28f2 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.Configurable; +import io.aiven.kafka.tieredstorage.FetchPart; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; @@ -58,17 +59,18 @@ protected ChunkCache(final ChunkManager chunkManager) { } /** - * Fetches a specific chunk from remote storage and stores into the cache. + * Fetches chunks of a specific range defined by {@code Part} from remote storage and stores into the cache. * Since it's not possible to cache an opened InputStream, the actual data is cached, and everytime * there is a call to cache the InputStream is recreated from the data stored in cache and stored into local * variable. This also allows solving the race condition between eviction and fetching. Since the InputStream is * opened right when fetching from cache happens even if the actual value is removed from the cache, * the InputStream will still contain the data. */ - public InputStream getChunk(final String objectKeyPath, - final SegmentManifest manifest, - final int chunkId) throws StorageBackendException, IOException { - final ChunkKey chunkKey = new ChunkKey(objectKeyPath, chunkId); + @Override + public InputStream partChunks(final String objectKeyPath, + final SegmentManifest manifest, + final FetchPart part) throws StorageBackendException, IOException { + final ChunkKey chunkKey = new ChunkKey(objectKeyPath, part.firstChunkId); final AtomicReference result = new AtomicReference<>(); try { return cache.asMap() @@ -76,8 +78,7 @@ public InputStream getChunk(final String objectKeyPath, if (val == null) { statsCounter.recordMiss(); try { - final InputStream chunk = - chunkManager.getChunk(objectKeyPath, manifest, chunkId); + final InputStream chunk = chunkManager.partChunks(objectKeyPath, manifest, part); final T t = this.cacheChunk(chunkKey, chunk); result.getAndSet(cachedChunkToInputStream(t)); return t; diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java index 5e1c9e4ad..8c3e1d991 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java @@ -65,6 +65,10 @@ public class RemoteStorageManagerConfig extends AbstractConfig { private static final String CHUNK_SIZE_CONFIG = "chunk.size"; private static final String CHUNK_SIZE_DOC = "The chunk size of log files"; + private static final String FETCH_PART_SIZE_CONFIG = "fetch.part.size"; + private static final String FETCH_PART_SIZE_DOC = "The size of parts used to fetch from tiered storage. " + + "Has to be larger than chunk size."; + private static final String COMPRESSION_ENABLED_CONFIG = "compression.enabled"; private static final String COMPRESSION_ENABLED_DOC = "Whether to enable compression"; @@ -141,6 +145,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig { ConfigDef.Importance.HIGH, CHUNK_SIZE_DOC ); + CONFIG.define( + FETCH_PART_SIZE_CONFIG, + ConfigDef.Type.INT, + 16 * 1024 * 1024, // 16MiB + ConfigDef.Range.between(1, Integer.MAX_VALUE), + ConfigDef.Importance.MEDIUM, + FETCH_PART_SIZE_DOC + ); CONFIG.define( COMPRESSION_ENABLED_CONFIG, @@ -298,9 +310,16 @@ public RemoteStorageManagerConfig(final Map props) { } private void validate() { + validateSizes(); validateCompression(); } + private void validateSizes() { + if (getInt(CHUNK_SIZE_CONFIG) > getInt(FETCH_PART_SIZE_CONFIG)) { + throw new ConfigException(FETCH_PART_SIZE_CONFIG + " must be larger than " + CHUNK_SIZE_CONFIG); + } + } + private void validateCompression() { if (getBoolean(COMPRESSION_HEURISTIC_ENABLED_CONFIG) && !getBoolean(COMPRESSION_ENABLED_CONFIG)) { throw new ConfigException( @@ -339,6 +358,10 @@ public int chunkSize() { return getInt(CHUNK_SIZE_CONFIG); } + public int fetchPartSize() { + return getInt(FETCH_PART_SIZE_CONFIG); + } + public boolean compressionEnabled() { return getBoolean(COMPRESSION_ENABLED_CONFIG); } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java index 9531b0ca0..b75160622 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java @@ -23,8 +23,10 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import io.aiven.kafka.tieredstorage.Chunk; +import io.aiven.kafka.tieredstorage.FetchPart; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; @@ -33,41 +35,51 @@ import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import org.apache.commons.io.input.BoundedInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FetchChunkEnumeration implements Enumeration { + static final Logger log = LoggerFactory.getLogger(FetchChunkEnumeration.class); + private final ChunkManager chunkManager; private final String objectKeyPath; private final SegmentManifest manifest; private final BytesRange range; - final int startChunkId; - final int lastChunkId; + final FetchPart firstPart; + final Chunk firstChunk; + final FetchPart lastPart; + final Chunk lastChunk; private final ChunkIndex chunkIndex; - int currentChunkId; + Optional currentPart; public boolean closed; + final int partSize; + /** - * - * @param chunkManager provides chunk input to fetch from + * @param chunkManager provides chunk input to fetch from * @param objectKeyPath required by chunkManager - * @param manifest provides to index to build response from - * @param range original offset range start/end position + * @param manifest provides to index to build response from + * @param range original offset range start/end position + * @param partSize fetch part size */ public FetchChunkEnumeration(final ChunkManager chunkManager, final String objectKeyPath, final SegmentManifest manifest, - final BytesRange range) { + final BytesRange range, + final int partSize) { this.chunkManager = Objects.requireNonNull(chunkManager, "chunkManager cannot be null"); this.objectKeyPath = Objects.requireNonNull(objectKeyPath, "objectKeyPath cannot be null"); this.manifest = Objects.requireNonNull(manifest, "manifest cannot be null"); this.range = Objects.requireNonNull(range, "range cannot be null"); + this.partSize = partSize; this.chunkIndex = manifest.chunkIndex(); - final Chunk firstChunk = getFirstChunk(range.from); - startChunkId = firstChunk.id; - currentChunkId = startChunkId; - final Chunk lastChunk = getLastChunk(range.to); - lastChunkId = lastChunk.id; + firstChunk = getFirstChunk(range.from); + firstPart = new FetchPart(chunkIndex, firstChunk, this.partSize); + currentPart = Optional.of(firstPart); + lastChunk = getLastChunk(range.to); + lastPart = new FetchPart(chunkIndex, lastChunk, this.partSize); } private Chunk getFirstChunk(final int fromPosition) { @@ -91,53 +103,63 @@ private Chunk getLastChunk(final int endPosition) { @Override public boolean hasMoreElements() { - return !closed && currentChunkId <= lastChunkId; + return !closed && currentPart.isPresent(); } @Override public InputStream nextElement() { - if (!hasMoreElements()) { + if (currentPart.isEmpty()) { throw new NoSuchElementException(); } - InputStream chunkContent = getChunkContent(currentChunkId); + final InputStream partContent = partChunks(currentPart.get()); - final Chunk currentChunk = chunkIndex.chunks().get(currentChunkId); - final int chunkStartPosition = currentChunk.originalPosition; - final boolean isAtFirstChunk = currentChunkId == startChunkId; - final boolean isAtLastChunk = currentChunkId == lastChunkId; - final boolean isSingleChunk = isAtFirstChunk && isAtLastChunk; - if (isSingleChunk) { + final int chunkStartPosition = currentPart.get().startPosition(); + final boolean isAtFirstPart = currentPart.get().equals(firstPart); + final boolean isAtLastPart = currentPart.get().equals(lastPart); + final boolean isSinglePart = isAtFirstPart && isAtLastPart; + if (isSinglePart) { final int toSkip = range.from - chunkStartPosition; try { - chunkContent.skip(toSkip); - final int chunkSize = range.size(); - chunkContent = new BoundedInputStream(chunkContent, chunkSize); + partContent.skip(toSkip); } catch (final IOException e) { throw new RuntimeException(e); } + + final int chunkSize = range.size(); + + log.trace("Returning part {} with size {}", currentPart.get(), chunkSize); + + currentPart = Optional.empty(); + return new BoundedInputStream(partContent, chunkSize); } else { - if (isAtFirstChunk) { + if (isAtFirstPart) { final int toSkip = range.from - chunkStartPosition; try { - chunkContent.skip(toSkip); + partContent.skip(toSkip); } catch (final IOException e) { throw new RuntimeException(e); } } - if (isAtLastChunk) { + if (isAtLastPart) { final int chunkSize = range.to - chunkStartPosition + 1; - chunkContent = new BoundedInputStream(chunkContent, chunkSize); + + log.trace("Returning part {} with size {}", currentPart.get(), chunkSize); + + currentPart = Optional.empty(); + return new BoundedInputStream(partContent, chunkSize); } } - currentChunkId += 1; - return chunkContent; + log.trace("Returning part {} with size {}", currentPart.get(), currentPart.get().range.size()); + + currentPart = currentPart.get().next(); + return partContent; } - private InputStream getChunkContent(final int chunkId) { + private InputStream partChunks(final FetchPart part) { try { - return chunkManager.getChunk(objectKeyPath, manifest, chunkId); + return chunkManager.partChunks(objectKeyPath, manifest, part); } catch (final KeyNotFoundException e) { throw new KeyNotFoundRuntimeException(e); } catch (final StorageBackendException | IOException e) { diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java index 6335a8763..730ea3579 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java @@ -21,6 +21,7 @@ import java.io.ByteArrayInputStream; import io.aiven.kafka.tieredstorage.AesKeyAwareTest; +import io.aiven.kafka.tieredstorage.FetchPart; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1; @@ -51,12 +52,13 @@ class DefaultChunkManagerTest extends AesKeyAwareTest { void testGetChunk() throws Exception { final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10); - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream("0123456789".getBytes())); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasContent("0123456789"); + final var part = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); + assertThat(chunkManager.partChunks(OBJECT_KEY_PATH, manifest, part)).hasContent("0123456789"); verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); } @@ -73,14 +75,16 @@ void testGetChunkWithEncryption() throws Exception { final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, encrypted.length, encrypted.length); + final var part = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); + when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())).thenReturn( new ByteArrayInputStream(encrypted)); final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, - new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad), null); + new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad)); final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); + assertThat(chunkManager.partChunks(OBJECT_KEY_PATH, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT); verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); } @@ -94,13 +98,15 @@ void testGetChunkWithCompression() throws Exception { } final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, compressed.length, compressed.length); + final var part = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); + when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream(compressed)); - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, true, null, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, true, null); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); + assertThat(chunkManager.partChunks(OBJECT_KEY_PATH, manifest, part)).hasBinaryContent(TEST_CHUNK_CONTENT); verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java index 76e3ea4d4..53c7dd6a3 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.stream.Stream; +import io.aiven.kafka.tieredstorage.FetchPart; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; @@ -39,7 +40,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.InstanceOfAssertFactories.DOUBLE; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.when; /** @@ -58,6 +58,8 @@ class ChunkCacheMetricsTest { ChunkManager chunkManager; @Mock SegmentManifest segmentManifest; + @Mock + FetchPart firstPart; private static Stream caches() { return Stream.of( @@ -83,7 +85,7 @@ private static Stream caches() { void shouldRecordMetrics(final Class> chunkCacheClass, final Map config) throws Exception { // Given a chunk cache implementation - when(chunkManager.getChunk(any(), any(), anyInt())) + when(chunkManager.partChunks(any(), any(), any())) .thenReturn(new ByteArrayInputStream("test".getBytes())); final var chunkCache = chunkCacheClass.getDeclaredConstructor(ChunkManager.class).newInstance(chunkManager); @@ -92,13 +94,13 @@ void shouldRecordMetrics(final Class> chunkCacheClass, final Map removalListener; + FetchPart firstPart; + FetchPart nextPart; + @BeforeEach void setUp() throws Exception { doAnswer(invocation -> removalListener).when(chunkCache).removalListener(); - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0)); - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1)); + final var chunkIndex = SEGMENT_MANIFEST.chunkIndex(); + firstPart = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); + when(chunkManager.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0)); + nextPart = firstPart.next().get(); + when(chunkManager.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1)); } @Test @@ -103,17 +108,17 @@ void noEviction() throws IOException, StorageBackendException { "size", "-1" )); - final InputStream chunk0 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + final InputStream chunk0 = chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); assertThat(chunk0).hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + verify(chunkManager).partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); + final InputStream cachedChunk0 = chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); assertThat(cachedChunk0).hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); - final InputStream chunk1 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + final InputStream chunk1 = chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart); assertThat(chunk1).hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); - final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + verify(chunkManager).partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart); + final InputStream cachedChunk1 = chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart); assertThat(cachedChunk1).hasBinaryContent(CHUNK_1); verifyNoMoreInteractions(chunkManager); @@ -127,20 +132,20 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte "size", "-1" )); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); + assertThat(chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); + verify(chunkManager).partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); + assertThat(chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); Thread.sleep(100); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); + assertThat(chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .hasBinaryContent(CHUNK_1); + verify(chunkManager).partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart); + assertThat(chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .hasBinaryContent(CHUNK_1); verifyNoMoreInteractions(chunkManager); await().atMost(Duration.ofMillis(5000)).pollInterval(Duration.ofMillis(100)) @@ -152,28 +157,28 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte any(), eq(RemovalCause.EXPIRED)); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - verify(chunkManager, times(2)).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + assertThat(chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); + verify(chunkManager, times(2)).partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); } @Test void sizeBasedEviction() throws IOException, StorageBackendException { chunkCache.configure(Map.of( - "retention.ms", "-1", - "size", "18" + "retention.ms", "-1", + "size", "18" )); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); + assertThat(chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); + verify(chunkManager).partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); + assertThat(chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + assertThat(chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .hasBinaryContent(CHUNK_1); + verify(chunkManager).partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart); await().atMost(Duration.ofMillis(5000)) .pollDelay(Duration.ofSeconds(2)) @@ -182,17 +187,20 @@ void sizeBasedEviction() throws IOException, StorageBackendException { verify(removalListener).onRemoval(any(ChunkKey.class), any(), eq(RemovalCause.SIZE)); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); - verify(chunkManager, times(3)).getChunk(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), anyInt()); + assertThat(chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .hasBinaryContent(CHUNK_0); + assertThat(chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .hasBinaryContent(CHUNK_1); + verify(chunkManager, times(3)).partChunks(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), any()); } } @Nested class ErrorHandlingTests { + FetchPart firstPart; + FetchPart nextPart; + private final Map configs = Map.of( "retention.ms", "-1", "size", "-1" @@ -201,57 +209,56 @@ class ErrorHandlingTests { @BeforeEach void setUp() { chunkCache.configure(configs); + + firstPart = new FetchPart(SEGMENT_MANIFEST.chunkIndex(), SEGMENT_MANIFEST.chunkIndex().chunks().get(0), 1); + nextPart = firstPart.next().get(); } @Test void failedFetching() throws Exception { - when(chunkManager.getChunk(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), anyInt())) - .thenThrow(new StorageBackendException(TEST_EXCEPTION_MESSAGE)) - .thenThrow(new IOException(TEST_EXCEPTION_MESSAGE)); - - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .isInstanceOf(StorageBackendException.class) - .hasMessage(TEST_EXCEPTION_MESSAGE); - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) - .isInstanceOf(IOException.class) - .hasMessage(TEST_EXCEPTION_MESSAGE); + when(chunkManager.partChunks(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), any())) + .thenThrow(new StorageBackendException(TEST_EXCEPTION_MESSAGE)) + .thenThrow(new IOException(TEST_EXCEPTION_MESSAGE)); + + assertThatThrownBy(() -> chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .isInstanceOf(StorageBackendException.class) + .hasMessage(TEST_EXCEPTION_MESSAGE); + assertThatThrownBy(() -> chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, nextPart)) + .isInstanceOf(IOException.class) + .hasMessage(TEST_EXCEPTION_MESSAGE); } @Test void failedReadingCachedValueWithInterruptedException() throws Exception { - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .thenReturn(new ByteArrayInputStream(CHUNK_0)); + when(chunkManager.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .thenReturn(new ByteArrayInputStream(CHUNK_0)); doCallRealMethod().doAnswer(invocation -> { throw new InterruptedException(TEST_EXCEPTION_MESSAGE); }).when(chunkCache).cachedChunkToInputStream(any()); - chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .isInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(ExecutionException.class) - .hasRootCauseInstanceOf(InterruptedException.class) - .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); + chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); + assertThatThrownBy(() -> chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(ExecutionException.class) + .hasRootCauseInstanceOf(InterruptedException.class) + .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); } @Test void failedReadingCachedValueWithExecutionException() throws Exception { - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)).thenReturn( - new ByteArrayInputStream(CHUNK_0)); + when(chunkManager.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)).thenReturn( + new ByteArrayInputStream(CHUNK_0)); doCallRealMethod().doAnswer(invocation -> { throw new ExecutionException(new RuntimeException(TEST_EXCEPTION_MESSAGE)); }).when(chunkCache).cachedChunkToInputStream(any()); - chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) - .isInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(ExecutionException.class) - .hasRootCauseInstanceOf(RuntimeException.class) - .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); + chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart); + assertThatThrownBy(() -> chunkCache.partChunks(SEGMENT_KEY, SEGMENT_MANIFEST, firstPart)) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(ExecutionException.class) + .hasRootCauseInstanceOf(RuntimeException.class) + .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); } } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java index 3b3cd86d9..c987bee25 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java @@ -45,6 +45,7 @@ void minimalConfig() { assertThat(config.segmentManifestCacheSize()).hasValue(1000L); assertThat(config.segmentManifestCacheRetention()).hasValue(Duration.ofHours(1)); assertThat(config.chunkSize()).isEqualTo(123); + assertThat(config.fetchPartSize()).isEqualTo(16 * 1024 * 1024); assertThat(config.compressionEnabled()).isFalse(); assertThat(config.compressionHeuristicEnabled()).isFalse(); assertThat(config.encryptionEnabled()).isFalse(); @@ -54,6 +55,18 @@ void minimalConfig() { assertThat(config.customMetadataKeysIncluded()).isEmpty(); } + @Test + void validFetchPartSize() { + final var config = new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "123", + "fetch.part.size", "124" + ) + ); + assertThat(config.fetchPartSize()).isEqualTo(124); + } + @Test void segmentManifestCacheSizeUnbounded() { final var config = new RemoteStorageManagerConfig( @@ -277,6 +290,36 @@ void objectStorageFactoryIsConfigured() { ))); } + @Test + void invalidFetchPartSizeRange() { + assertThatThrownBy(() -> new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "10", + "fetch.part.size", "0" + ) + )).isInstanceOf(ConfigException.class) + .hasMessage("Invalid value 0 for configuration fetch.part.size: Value must be at least 1"); + + assertThatThrownBy(() -> new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "10", + "fetch.part.size", Long.toString((long) Integer.MAX_VALUE + 1) + ) + )).isInstanceOf(ConfigException.class) + .hasMessage("Invalid value 2147483648 for configuration fetch.part.size: Not a number of type INT"); + + assertThatThrownBy(() -> new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "10", + "fetch.part.size", "5" + ) + )).isInstanceOf(ConfigException.class) + .hasMessage("fetch.part.size must be larger than chunk.size"); + } + @Test void invalidChunkSizeRange() { assertThatThrownBy(() -> new RemoteStorageManagerConfig( diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java index 9fb048a53..f055f418b 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java @@ -62,7 +62,7 @@ class FetchChunkEnumerationSourceInputStreamClosingTest { static final FixedSizeChunkIndex CHUNK_INDEX = new FixedSizeChunkIndex( CHUNK_SIZE, CHUNK_SIZE * 3, CHUNK_SIZE, CHUNK_SIZE); static final SegmentManifest SEGMENT_MANIFEST = new SegmentManifestV1( - CHUNK_INDEX, false, null, null); + CHUNK_INDEX, false, null); TestObjectFetcher fetcher; @@ -79,7 +79,7 @@ void test(final Map config, final ChunkManagerFactory chunkManagerFactory = new ChunkManagerFactory(); chunkManagerFactory.configure(config); final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(fetcher, null); - final var is = new FetchChunkEnumeration(chunkManager, OBJECT_KEY_PATH, SEGMENT_MANIFEST, range) + final var is = new FetchChunkEnumeration(chunkManager, OBJECT_KEY_PATH, SEGMENT_MANIFEST, range, 1) .toInputStream(); if (readFully) { is.readAllBytes(); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java index c37b9987a..45d484aed 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java @@ -19,6 +19,7 @@ import java.io.ByteArrayInputStream; import java.util.NoSuchElementException; +import io.aiven.kafka.tieredstorage.FetchPart; import io.aiven.kafka.tieredstorage.chunkmanager.DefaultChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1; @@ -33,6 +34,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -41,7 +44,7 @@ class FetchChunkEnumerationTest { DefaultChunkManager chunkManager; final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 100, 10, 100); - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null); static final byte[] CHUNK_CONTENT = "0123456789".getBytes(); static final String SEGMENT_KEY_PATH = "topic/segment"; @@ -52,13 +55,13 @@ class FetchChunkEnumerationTest { @Test void failsWhenLargerStartPosition() { // Given - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null); // When final int from = 1000; final int to = from + 1; // Then assertThatThrownBy( - () -> new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to))) + () -> new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to), 1)) .hasMessage("Invalid start position " + from + " in segment path topic/segment"); } @@ -71,9 +74,9 @@ void endPositionIsWithinIndex() { final int to = 80; // Then final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); - assertThat(fetchChunk.startChunkId).isEqualTo(0); - assertThat(fetchChunk.lastChunkId).isEqualTo(8); + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to), 1); + assertThat(fetchChunk.firstChunk.id).isEqualTo(0); + assertThat(fetchChunk.lastChunk.id).isEqualTo(8); } // - End position outside index @@ -84,10 +87,10 @@ void endPositionIsOutsideIndex() { final int from = 0; final int to = 110; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to), 1); // Then - assertThat(fetchChunk.startChunkId).isEqualTo(0); - assertThat(fetchChunk.lastChunkId).isEqualTo(9); + assertThat(fetchChunk.firstChunk.id).isEqualTo(0); + assertThat(fetchChunk.lastChunk.id).isEqualTo(9); } // - Single chunk @@ -98,11 +101,11 @@ void shouldReturnRangeFromSingleChunk() throws StorageBackendException { final int from = 32; final int to = 34; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, fetchChunk.currentChunkId)) + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to), 1); + when(chunkManager.partChunks(eq(SEGMENT_KEY_PATH), eq(manifest), any())) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then - assertThat(fetchChunk.startChunkId).isEqualTo(fetchChunk.lastChunkId); + assertThat(fetchChunk.firstChunk.id).isEqualTo(fetchChunk.lastChunk.id); assertThat(fetchChunk.nextElement()).hasContent("234"); assertThat(fetchChunk.hasMoreElements()).isFalse(); assertThatThrownBy(fetchChunk::nextElement).isInstanceOf(NoSuchElementException.class); @@ -116,15 +119,19 @@ void shouldReturnRangeFromMultipleChunks() throws StorageBackendException { final int from = 15; final int to = 34; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 1)) + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to), 1); + final var first = new FetchPart(chunkIndex, chunkIndex.chunks().get(0), 1); + final var part1 = first.next().get(); + when(chunkManager.partChunks(SEGMENT_KEY_PATH, manifest, part1)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 2)) + final var part2 = part1.next().get(); + when(chunkManager.partChunks(SEGMENT_KEY_PATH, manifest, part2)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 3)) + final var part3 = part2.next().get(); + when(chunkManager.partChunks(SEGMENT_KEY_PATH, manifest, part3)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then - assertThat(fetchChunk.startChunkId).isNotEqualTo(fetchChunk.lastChunkId); + assertThat(fetchChunk.firstChunk.id).isNotEqualTo(fetchChunk.lastChunk.id); assertThat(fetchChunk.nextElement()).hasContent("56789"); assertThat(fetchChunk.nextElement()).hasContent("0123456789"); assertThat(fetchChunk.nextElement()).hasContent("01234");