diff --git a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index 49b8c07c5..5257cda30 100644 --- a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -581,7 +581,7 @@ private void writeManifest(final ObjectKeyFactory objectKeyFactory) throws IOExc + "\"originalFileSize\":1000,\"transformedChunkSize\":110,\"finalTransformedChunkSize\":110}," + "\"compression\":false}"; final Path manifestPath = targetDir.resolve( - objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)); + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value()); Files.createDirectories(manifestPath.getParent()); Files.writeString(manifestPath, manifest); } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java b/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java index 0ba09e107..7d7f22b24 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java @@ -25,6 +25,8 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; + import static io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField.OBJECT_KEY; import static io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField.OBJECT_PREFIX; @@ -84,13 +86,11 @@ public ObjectKeyFactory(final String prefix) { * * @see ObjectKeyFactory#mainPath(RemoteLogSegmentMetadata) */ - public String key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final Suffix suffix) { + public ObjectKey key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final Suffix suffix) { Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null"); Objects.requireNonNull(suffix, "suffix cannot be null"); - return prefix - + mainPath(remoteLogSegmentMetadata) - + "." + suffix.value; + return new PlainObjectKey(prefix, mainPath(remoteLogSegmentMetadata) + "." + suffix.value); } /** @@ -103,16 +103,16 @@ public String key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final *

For example: * {@code someprefix/topic-MWJ6FHTfRYy67jzwZdeqSQ/7/00000000000000001234-tqimKeZwStOEOwRzT3L5oQ.log} */ - public String key(final Map fields, - final RemoteLogSegmentMetadata remoteLogSegmentMetadata, - final Suffix suffix) { + public ObjectKey key(final Map fields, + final RemoteLogSegmentMetadata remoteLogSegmentMetadata, + final Suffix suffix) { Objects.requireNonNull(fields, "fields cannot be null"); Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null"); Objects.requireNonNull(suffix, "suffix cannot be null"); final var prefix = (String) fields.getOrDefault(OBJECT_PREFIX.index(), this.prefix); final var main = (String) fields.getOrDefault(OBJECT_KEY.index(), mainPath(remoteLogSegmentMetadata)); - return prefix + main + "." + suffix.value; + return new PlainObjectKey(prefix, main + "." + suffix.value); } /** @@ -150,4 +150,44 @@ private static String filenamePrefixFromOffset(final long offset) { nf.setGroupingUsed(false); return nf.format(offset); } + + static class PlainObjectKey implements ObjectKey { + private final String prefix; + private final String mainPathAndSuffix; + + PlainObjectKey(final String prefix, final String mainPathAndSuffix) { + this.prefix = Objects.requireNonNull(prefix, "prefix cannot be null"); + this.mainPathAndSuffix = Objects.requireNonNull(mainPathAndSuffix, "mainPathAndSuffix cannot be null"); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PlainObjectKey that = (PlainObjectKey) o; + return Objects.equals(prefix, that.prefix) + && Objects.equals(mainPathAndSuffix, that.mainPathAndSuffix); + } + + @Override + public int hashCode() { + int result = prefix.hashCode(); + result = 31 * result + mainPathAndSuffix.hashCode(); + return result; + } + + @Override + public String value() { + return prefix + mainPathAndSuffix; + } + + @Override + public String toString() { + return value(); + } + } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index cfddd1071..2d9ee8392 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -64,6 +64,7 @@ import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; import io.aiven.kafka.tieredstorage.storage.ObjectDeleter; import io.aiven.kafka.tieredstorage.storage.ObjectFetcher; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.ObjectUploader; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -291,7 +292,7 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet final TransformFinisher transformFinisher, final SegmentCustomMetadataBuilder customMetadataBuilder) throws IOException, StorageBackendException { - final String fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG); + final ObjectKey fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG); try (final var sis = transformFinisher.toInputStream()) { final var bytes = uploader.upload(sis, fileKey); metrics.recordObjectUpload( @@ -322,7 +323,7 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta new TransformFinisher(transformEnum); final var suffix = ObjectKeyFactory.Suffix.fromIndexType(indexType); - final String key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix); + final ObjectKey key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix); try (final var in = transformFinisher.toInputStream()) { final var bytes = uploader.upload(in, key); metrics.recordObjectUpload( @@ -341,10 +342,11 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad final SegmentCustomMetadataBuilder customMetadataBuilder) throws StorageBackendException, IOException { final String manifest = mapper.writeValueAsString(segmentManifest); - final String manifestFileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST); + final ObjectKey manifestObjectKey = + objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST); try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) { - final var bytes = uploader.upload(manifestContent, manifestFileKey); + final var bytes = uploader.upload(manifestContent, manifestObjectKey); metrics.recordObjectUpload( remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), ObjectKeyFactory.Suffix.MANIFEST, @@ -425,9 +427,9 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet } } - private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, - final ObjectKeyFactory.Suffix suffix) { - final String segmentKey; + private ObjectKey objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, + final ObjectKeyFactory.Suffix suffix) { + final ObjectKey segmentKey; if (remoteLogSegmentMetadata.customMetadata().isPresent()) { final var customMetadataBytes = remoteLogSegmentMetadata.customMetadata().get(); final var fields = customMetadataSerde.deserialize(customMetadataBytes.value()); @@ -440,7 +442,7 @@ private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata private SegmentManifest fetchSegmentManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws StorageBackendException, IOException { - final String manifestKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST); + final ObjectKey manifestKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST); return segmentManifestProvider.get(manifestKey); } @@ -457,7 +459,7 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment try { for (final ObjectKeyFactory.Suffix suffix : ObjectKeyFactory.Suffix.values()) { - final String key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix); + final ObjectKey key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix); deleter.delete(key); } } catch (final Exception e) { diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkKey.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkKey.java index 344bb8c7e..cfe1615d5 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkKey.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkKey.java @@ -19,14 +19,16 @@ import java.nio.file.Path; import java.util.Objects; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; + public class ChunkKey { public final String segmentFileName; public final int chunkId; - public ChunkKey(final String objectKeyPath, final int chunkId) { - Objects.requireNonNull(objectKeyPath, "objectKeyPath cannot be null"); + public ChunkKey(final ObjectKey objectKey, final int chunkId) { + Objects.requireNonNull(objectKey, "objectKey cannot be null"); // get last part of segment path + chunk id, as it's used for creating file names - this.segmentFileName = Path.of(objectKeyPath).getFileName().toString(); + this.segmentFileName = Path.of(objectKey.value()).getFileName().toString(); this.chunkId = chunkId; } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java index dde21c86a..849941dc9 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java @@ -20,11 +20,12 @@ import java.io.InputStream; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; public interface ChunkManager { - InputStream getChunk(final String objectKeyPath, + InputStream getChunk(final ObjectKey objectKeyPath, final SegmentManifest manifest, final int chunkId) throws StorageBackendException, IOException; } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java index 5310476d0..af1570cf3 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java @@ -25,6 +25,7 @@ import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider; import io.aiven.kafka.tieredstorage.storage.ObjectFetcher; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import io.aiven.kafka.tieredstorage.transform.BaseDetransformChunkEnumeration; import io.aiven.kafka.tieredstorage.transform.DecompressionChunkEnumeration; @@ -46,7 +47,7 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi * * @return an {@link InputStream} of the chunk, plain text (i.e., decrypted and decompressed). */ - public InputStream getChunk(final String objectKeyPath, final SegmentManifest manifest, + public InputStream getChunk(final ObjectKey objectKeyPath, final SegmentManifest manifest, final int chunkId) throws StorageBackendException { final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java index bea267398..12e625f5f 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java @@ -33,6 +33,7 @@ import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import com.github.benmanes.caffeine.cache.AsyncCache; @@ -65,7 +66,7 @@ protected ChunkCache(final ChunkManager chunkManager) { * opened right when fetching from cache happens even if the actual value is removed from the cache, * the InputStream will still contain the data. */ - public InputStream getChunk(final String objectKeyPath, + public InputStream getChunk(final ObjectKey objectKeyPath, final SegmentManifest manifest, final int chunkId) throws StorageBackendException, IOException { final ChunkKey chunkKey = new ChunkKey(objectKeyPath, chunkId); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java index af2a76d5d..c8f8eb885 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java @@ -27,6 +27,7 @@ import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter; import io.aiven.kafka.tieredstorage.storage.ObjectFetcher; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -37,7 +38,7 @@ public class SegmentManifestProvider { private static final String SEGMENT_MANIFEST_METRIC_GROUP_NAME = "segment-manifest-cache"; private static final long GET_TIMEOUT_SEC = 10; - private final AsyncLoadingCache cache; + private final AsyncLoadingCache cache; /** * @param maxCacheSize the max cache size (in items) or empty if the cache is unbounded. @@ -62,7 +63,7 @@ public SegmentManifestProvider(final Optional maxCacheSize, statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize); } - public SegmentManifest get(final String manifestKey) + public SegmentManifest get(final ObjectKey manifestKey) throws StorageBackendException, IOException { try { return cache.get(manifestKey).get(GET_TIMEOUT_SEC, TimeUnit.SECONDS); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java index 9531b0ca0..922a1574d 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java @@ -30,13 +30,14 @@ import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import org.apache.commons.io.input.BoundedInputStream; public class FetchChunkEnumeration implements Enumeration { private final ChunkManager chunkManager; - private final String objectKeyPath; + private final ObjectKey objectKey; private final SegmentManifest manifest; private final BytesRange range; final int startChunkId; @@ -48,16 +49,16 @@ public class FetchChunkEnumeration implements Enumeration { /** * * @param chunkManager provides chunk input to fetch from - * @param objectKeyPath required by chunkManager + * @param objectKey required by chunkManager * @param manifest provides to index to build response from * @param range original offset range start/end position */ public FetchChunkEnumeration(final ChunkManager chunkManager, - final String objectKeyPath, + final ObjectKey objectKey, final SegmentManifest manifest, final BytesRange range) { this.chunkManager = Objects.requireNonNull(chunkManager, "chunkManager cannot be null"); - this.objectKeyPath = Objects.requireNonNull(objectKeyPath, "objectKeyPath cannot be null"); + this.objectKey = Objects.requireNonNull(objectKey, "objectKey cannot be null"); this.manifest = Objects.requireNonNull(manifest, "manifest cannot be null"); this.range = Objects.requireNonNull(range, "range cannot be null"); @@ -74,7 +75,7 @@ private Chunk getFirstChunk(final int fromPosition) { final Chunk firstChunk = chunkIndex.findChunkForOriginalOffset(fromPosition); if (firstChunk == null) { throw new IllegalArgumentException("Invalid start position " + fromPosition - + " in segment path " + objectKeyPath); + + " in segment path " + objectKey); } return firstChunk; } @@ -137,7 +138,7 @@ public InputStream nextElement() { private InputStream getChunkContent(final int chunkId) { try { - return chunkManager.getChunk(objectKeyPath, manifest, chunkId); + return chunkManager.getChunk(objectKey, manifest, chunkId); } catch (final KeyNotFoundException e) { throw new KeyNotFoundRuntimeException(e); } catch (final StorageBackendException | IOException e) { diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java index 50a4ec5e7..88537aa7d 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java @@ -43,31 +43,32 @@ class ObjectKeyFactoryTest { @Test void test() { final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); - assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG)) + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()) .isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); - assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX)) + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX).value()) .isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); - assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX)) + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX).value()) .isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); - assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT)) + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT).value()) .isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); - assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX)) + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX).value()) .isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); - assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT)) + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .value()) .isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); - assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST)) + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value()) .isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); @@ -78,37 +79,38 @@ void withCustomFieldsEmpty() { final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); final Map fields = Map.of(); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value() ).isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX).value() ).isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX).value() ).isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT).value() ).isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX).value() ).isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); assertThat( objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .value() ).isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value() ).isEqualTo( "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); @@ -119,37 +121,38 @@ void withCustomFieldsOnlyPrefix() { final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value() ).isEqualTo( "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX).value() ).isEqualTo( "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX).value() ).isEqualTo( "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT).value() ).isEqualTo( "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX).value() ).isEqualTo( "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); assertThat( objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .value() ).isEqualTo( "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value() ).isEqualTo( "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); @@ -160,25 +163,26 @@ void withCustomFieldsOnlyKey() { final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value() ).isEqualTo("prefix/topic/7/file.log"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX).value() ).isEqualTo("prefix/topic/7/file.index"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX).value() ).isEqualTo("prefix/topic/7/file.timeindex"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT).value() ).isEqualTo("prefix/topic/7/file.snapshot"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX).value() ).isEqualTo("prefix/topic/7/file.txnindex"); assertThat( objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .value() ).isEqualTo("prefix/topic/7/file.leader-epoch-checkpoint"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value() ).isEqualTo("prefix/topic/7/file.rsm-manifest"); } @@ -189,32 +193,33 @@ void withCustomFieldsAll() { SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/", SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value() ).isEqualTo("other/topic/7/file.log"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX).value() ).isEqualTo("other/topic/7/file.index"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX).value() ).isEqualTo("other/topic/7/file.timeindex"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT).value() ).isEqualTo("other/topic/7/file.snapshot"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX).value() ).isEqualTo("other/topic/7/file.txnindex"); assertThat( objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .value() ).isEqualTo("other/topic/7/file.leader-epoch-checkpoint"); assertThat( - objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value() ).isEqualTo("other/topic/7/file.rsm-manifest"); } @Test void nullPrefix() { final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(null); - assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG)) + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()) .isEqualTo( "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/PlainObjectKeyEqualsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/PlainObjectKeyEqualsTest.java new file mode 100644 index 000000000..47802f4ff --- /dev/null +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/PlainObjectKeyEqualsTest.java @@ -0,0 +1,50 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class PlainObjectKeyEqualsTest { + @Test + void identical() { + final var k1 = new ObjectKeyFactory.PlainObjectKey("prefix", "mainPathAndSuffix"); + final var k2 = new ObjectKeyFactory.PlainObjectKey("prefix", "mainPathAndSuffix"); + assertThat(k1).isEqualTo(k2); + assertThat(k2).isEqualTo(k1); + assertThat(k1).hasSameHashCodeAs(k2); + } + + @Test + void differentPrefix() { + final var k1 = new ObjectKeyFactory.PlainObjectKey("prefix1", "mainPathAndSuffix"); + final var k2 = new ObjectKeyFactory.PlainObjectKey("prefix2", "mainPathAndSuffix"); + assertThat(k1).isNotEqualTo(k2); + assertThat(k2).isNotEqualTo(k1); + assertThat(k1).doesNotHaveSameHashCodeAs(k2); + } + + @Test + void differentMainPathAndSuffix() { + final var k1 = new ObjectKeyFactory.PlainObjectKey("prefix", "mainPathAndSuffix1"); + final var k2 = new ObjectKeyFactory.PlainObjectKey("prefix", "mainPathAndSuffix2"); + assertThat(k1).isNotEqualTo(k2); + assertThat(k2).isNotEqualTo(k1); + assertThat(k1).doesNotHaveSameHashCodeAs(k2); + } +} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkKeyTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkKeyTest.java index 079507148..7655a90f8 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkKeyTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkKeyTest.java @@ -28,8 +28,8 @@ class ChunkKeyTest { @Test void identical() { - final var ck1 = new ChunkKey(UUID_1, 0); - final var ck2 = new ChunkKey(UUID_1, 0); + final var ck1 = new ChunkKey(() -> UUID_1, 0); + final var ck2 = new ChunkKey(() -> UUID_1, 0); assertThat(ck1).isEqualTo(ck2); assertThat(ck2).isEqualTo(ck1); assertThat(ck1).hasSameHashCodeAs(ck2); @@ -37,8 +37,8 @@ void identical() { @Test void differentUuid() { - final var ck1 = new ChunkKey(UUID_1, 0); - final var ck2 = new ChunkKey(UUID_2, 0); + final var ck1 = new ChunkKey(() -> UUID_1, 0); + final var ck2 = new ChunkKey(() -> UUID_2, 0); assertThat(ck1).isNotEqualTo(ck2); assertThat(ck2).isNotEqualTo(ck1); assertThat(ck1).doesNotHaveSameHashCodeAs(ck2); @@ -46,8 +46,8 @@ void differentUuid() { @Test void differentChunkIds() { - final var ck1 = new ChunkKey(UUID_1, 0); - final var ck2 = new ChunkKey(UUID_1, 1); + final var ck1 = new ChunkKey(() -> UUID_1, 0); + final var ck2 = new ChunkKey(() -> UUID_1, 1); assertThat(ck1).isNotEqualTo(ck2); assertThat(ck2).isNotEqualTo(ck1); assertThat(ck1).doesNotHaveSameHashCodeAs(ck2); @@ -55,11 +55,11 @@ void differentChunkIds() { @Test void singlePath() { - assertThat(new ChunkKey("test", 0).path()).isEqualTo("test-0"); + assertThat(new ChunkKey(() -> "test", 0).path()).isEqualTo("test-0"); } @Test void pathWitDir() { - assertThat(new ChunkKey("parent/test", 0).path()).isEqualTo("test-0"); + assertThat(new ChunkKey(() -> "parent/test", 0).path()).isEqualTo("test-0"); } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java index 6335a8763..0d58aa94d 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java @@ -27,6 +27,7 @@ import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider; import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import com.github.luben.zstd.ZstdCompressCtx; @@ -42,7 +43,7 @@ @ExtendWith(MockitoExtension.class) class DefaultChunkManagerTest extends AesKeyAwareTest { - static final String OBJECT_KEY_PATH = "topic/segment.log"; + static final ObjectKey OBJECT_KEY = () -> "topic/segment.log"; static final byte[] TEST_CHUNK_CONTENT = "0123456789".getBytes(); @Mock private StorageBackend storage; @@ -53,11 +54,11 @@ void testGetChunk() throws Exception { final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); - when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())) + when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream("0123456789".getBytes())); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasContent("0123456789"); - verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); + assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasContent("0123456789"); + verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); } @Test @@ -73,15 +74,15 @@ void testGetChunkWithEncryption() throws Exception { final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, encrypted.length, encrypted.length); - when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())).thenReturn( + when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range())).thenReturn( new ByteArrayInputStream(encrypted)); final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad), null); final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); - verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); + assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); + verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); } @Test @@ -94,13 +95,13 @@ void testGetChunkWithCompression() throws Exception { } final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, compressed.length, compressed.length); - when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())) + when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream(compressed)); final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, true, null, null); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); - verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); + assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); + verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java index 76e3ea4d4..d5460edc8 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java @@ -27,6 +27,7 @@ import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; @@ -49,7 +50,7 @@ class ChunkCacheMetricsTest { static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer(); - public static final String OBJECT_KEY_PATH = "topic/segment"; + public static final ObjectKey OBJECT_KEY_PATH = () -> "topic/segment"; @TempDir static Path baseCachePath; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java index f759895b7..d4e2db78a 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java @@ -28,6 +28,7 @@ import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1; import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import com.github.benmanes.caffeine.cache.RemovalCause; @@ -68,6 +69,8 @@ class ChunkCacheTest { new SegmentManifestV1(FIXED_SIZE_CHUNK_INDEX, false, null, null); private static final String TEST_EXCEPTION_MESSAGE = "test_message"; private static final String SEGMENT_KEY = "topic/segment"; + private static final ObjectKey SEGMENT_OBJECT_KEY = () -> SEGMENT_KEY; + @Mock private ChunkManager chunkManager; private ChunkCache chunkCache; @@ -90,9 +93,9 @@ class CacheTests { @BeforeEach void setUp() throws Exception { doAnswer(invocation -> removalListener).when(chunkCache).removalListener(); - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0)); - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1)); } @@ -103,17 +106,17 @@ void noEviction() throws IOException, StorageBackendException { "size", "-1" )); - final InputStream chunk0 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + final InputStream chunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); assertThat(chunk0).hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); + final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); assertThat(cachedChunk0).hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); - final InputStream chunk1 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + final InputStream chunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); assertThat(chunk1).hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); - final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); + final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); assertThat(cachedChunk1).hasBinaryContent(CHUNK_1); verifyNoMoreInteractions(chunkManager); @@ -127,19 +130,19 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte "size", "-1" )); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); Thread.sleep(100); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .hasBinaryContent(CHUNK_1); verifyNoMoreInteractions(chunkManager); @@ -152,9 +155,9 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte any(), eq(RemovalCause.EXPIRED)); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); - verify(chunkManager, times(2)).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + verify(chunkManager, times(2)).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); } @Test @@ -164,16 +167,16 @@ void sizeBasedEviction() throws IOException, StorageBackendException { "size", "18" )); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); await().atMost(Duration.ofMillis(5000)) .pollDelay(Duration.ofSeconds(2)) @@ -182,11 +185,11 @@ void sizeBasedEviction() throws IOException, StorageBackendException { verify(removalListener).onRemoval(any(ChunkKey.class), any(), eq(RemovalCause.SIZE)); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .hasBinaryContent(CHUNK_1); - verify(chunkManager, times(3)).getChunk(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), anyInt()); + verify(chunkManager, times(3)).getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt()); } } @@ -205,32 +208,32 @@ void setUp() { @Test void failedFetching() throws Exception { - when(chunkManager.getChunk(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), anyInt())) + when(chunkManager.getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt())) .thenThrow(new StorageBackendException(TEST_EXCEPTION_MESSAGE)) .thenThrow(new IOException(TEST_EXCEPTION_MESSAGE)); assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .isInstanceOf(StorageBackendException.class) .hasMessage(TEST_EXCEPTION_MESSAGE); assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .isInstanceOf(IOException.class) .hasMessage(TEST_EXCEPTION_MESSAGE); } @Test void failedReadingCachedValueWithInterruptedException() throws Exception { - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .thenReturn(new ByteArrayInputStream(CHUNK_0)); doCallRealMethod().doAnswer(invocation -> { throw new InterruptedException(TEST_EXCEPTION_MESSAGE); }).when(chunkCache).cachedChunkToInputStream(any()); - chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .isInstanceOf(RuntimeException.class) .hasCauseInstanceOf(ExecutionException.class) .hasRootCauseInstanceOf(InterruptedException.class) @@ -239,15 +242,15 @@ void failedReadingCachedValueWithInterruptedException() throws Exception { @Test void failedReadingCachedValueWithExecutionException() throws Exception { - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)).thenReturn( + when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)).thenReturn( new ByteArrayInputStream(CHUNK_0)); doCallRealMethod().doAnswer(invocation -> { throw new ExecutionException(new RuntimeException(TEST_EXCEPTION_MESSAGE)); }).when(chunkCache).cachedChunkToInputStream(any()); - chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .isInstanceOf(RuntimeException.class) .hasCauseInstanceOf(ExecutionException.class) .hasRootCauseInstanceOf(RuntimeException.class) diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCacheTest.java index 96c0a0e56..980ffbb71 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCacheTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCacheTest.java @@ -24,6 +24,7 @@ import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; @@ -51,6 +52,8 @@ @ExtendWith(MockitoExtension.class) class DiskBasedChunkCacheTest { public static final String SEGMENT_ID = "topic/segment"; + public static final ObjectKey SEGMENT_OBJECT_KEY = () -> SEGMENT_ID; + private static final byte[] CHUNK_0 = "0123456789".getBytes(); private static final byte[] CHUNK_1 = "1011121314".getBytes(); private static final String TEST_EXCEPTION_MESSAGE = "test_message"; @@ -82,8 +85,8 @@ void failedToMoveFromTempCache() { final ByteArrayInputStream chunkStream0 = new ByteArrayInputStream(CHUNK_0); final ByteArrayInputStream chunkStream1 = new ByteArrayInputStream(CHUNK_1); - final ChunkKey chunkKey0 = new ChunkKey(SEGMENT_ID, 0); - final ChunkKey chunkKey1 = new ChunkKey(SEGMENT_ID, 1); + final ChunkKey chunkKey0 = new ChunkKey(SEGMENT_OBJECT_KEY, 0); + final ChunkKey chunkKey1 = new ChunkKey(SEGMENT_OBJECT_KEY, 1); assertThatThrownBy(() -> diskBasedChunkCache.cacheChunk(chunkKey0, chunkStream0)) .isInstanceOf(IOException.class) @@ -109,8 +112,8 @@ void failedToMoveFromTempCache() { void cacheChunks() throws IOException { final ByteArrayInputStream chunkStream0 = new ByteArrayInputStream(CHUNK_0); final ByteArrayInputStream chunkStream1 = new ByteArrayInputStream(CHUNK_1); - final ChunkKey chunkKey0 = new ChunkKey(SEGMENT_ID, 0); - final ChunkKey chunkKey1 = new ChunkKey(SEGMENT_ID, 1); + final ChunkKey chunkKey0 = new ChunkKey(SEGMENT_OBJECT_KEY, 0); + final ChunkKey chunkKey1 = new ChunkKey(SEGMENT_OBJECT_KEY, 1); final Path cachedChunkPath0 = diskBasedChunkCache.cacheChunk(chunkKey0, chunkStream0); final Path cachedChunkPath1 = diskBasedChunkCache.cacheChunk(chunkKey1, chunkStream1); @@ -140,7 +143,7 @@ void failsToReadFile() { @Test void weighsCorrectly() throws IOException { final ByteArrayInputStream chunkStream = new ByteArrayInputStream(CHUNK_0); - final ChunkKey chunkKey = new ChunkKey(SEGMENT_ID, 0); + final ChunkKey chunkKey = new ChunkKey(SEGMENT_OBJECT_KEY, 0); final Path cachedChunkPath = diskBasedChunkCache.cacheChunk(chunkKey, chunkStream); @@ -151,7 +154,7 @@ void weighsCorrectly() throws IOException { @Test void weighingTooBigFiles() throws IOException { final ByteArrayInputStream chunkStream = new ByteArrayInputStream(CHUNK_0); - final ChunkKey chunkKey = new ChunkKey(SEGMENT_ID, 0); + final ChunkKey chunkKey = new ChunkKey(SEGMENT_OBJECT_KEY, 0); final Path cachedChunkPath = diskBasedChunkCache.cacheChunk(chunkKey, chunkStream); @@ -166,7 +169,7 @@ void weighingTooBigFiles() throws IOException { @Test void weighingFails() throws IOException { final ByteArrayInputStream chunkStream = new ByteArrayInputStream(CHUNK_0); - final ChunkKey chunkKey = new ChunkKey(SEGMENT_ID, 0); + final ChunkKey chunkKey = new ChunkKey(SEGMENT_OBJECT_KEY, 0); final Path cachedChunkPath = diskBasedChunkCache.cacheChunk(chunkKey, chunkStream); try (final MockedStatic filesMockedStatic = mockStatic(Files.class, CALLS_REAL_METHODS)) { @@ -183,7 +186,7 @@ void weighingFails() throws IOException { @Test void removesCorrectly() throws IOException { final ByteArrayInputStream chunkStream = new ByteArrayInputStream(CHUNK_0); - final ChunkKey chunkKey = new ChunkKey(SEGMENT_ID, 0); + final ChunkKey chunkKey = new ChunkKey(SEGMENT_OBJECT_KEY, 0); final Path cachedChunkPath = diskBasedChunkCache.cacheChunk(chunkKey, chunkStream); @@ -195,7 +198,7 @@ void removesCorrectly() throws IOException { @Test void removalFails() throws IOException { final ByteArrayInputStream chunkStream = new ByteArrayInputStream(CHUNK_0); - final ChunkKey chunkKey = new ChunkKey(SEGMENT_ID, 0); + final ChunkKey chunkKey = new ChunkKey(SEGMENT_OBJECT_KEY, 0); final Path cachedChunkPath = diskBasedChunkCache.cacheChunk(chunkKey, chunkStream); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/config/NoopStorageBackend.java b/core/src/test/java/io/aiven/kafka/tieredstorage/config/NoopStorageBackend.java index 9b255b743..f353f4566 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/config/NoopStorageBackend.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/config/NoopStorageBackend.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef; import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -37,22 +38,22 @@ public void configure(final Map configs) { } @Override - public long upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final ObjectKey key) throws StorageBackendException { return 0; } @Override - public InputStream fetch(final String key) throws StorageBackendException { + public InputStream fetch(final ObjectKey key) throws StorageBackendException { return null; } @Override - public InputStream fetch(final String key, final BytesRange range) throws StorageBackendException { + public InputStream fetch(final ObjectKey key, final BytesRange range) throws StorageBackendException { return null; } @Override - public void delete(final String key) throws StorageBackendException { + public void delete(final ObjectKey key) throws StorageBackendException { } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java index b81e3fdd6..059f78157 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java @@ -25,6 +25,7 @@ import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; import io.aiven.kafka.tieredstorage.manifest.serde.KafkaTypeSerdeModule; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -39,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -48,7 +49,7 @@ @ExtendWith(MockitoExtension.class) class SegmentManifestProviderTest { static final ObjectMapper MAPPER = new ObjectMapper(); - public static final String MANIFEST_KEY = "topic/manifest"; + public static final ObjectKey MANIFEST_KEY = () -> "topic/manifest"; static { MAPPER.registerModule(new Jdk8Module()); @@ -91,7 +92,8 @@ void withoutRetentionLimitsShouldBeCreated() { @Test void shouldReturnAndCache() throws StorageBackendException, IOException { - final String key = "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000000023-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"; + final ObjectKey key = + () -> "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000000023-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"; when(storage.fetch(key)) .thenReturn(new ByteArrayInputStream(MANIFEST.getBytes())); final SegmentManifestV1 expectedManifest = new SegmentManifestV1( @@ -106,7 +108,7 @@ void shouldReturnAndCache() throws StorageBackendException, IOException { @Test void shouldPropagateStorageBackendException() throws StorageBackendException { - when(storage.fetch(anyString())) + when(storage.fetch(any())) .thenThrow(new StorageBackendException("test")); assertThatThrownBy(() -> provider.get(MANIFEST_KEY)) .isInstanceOf(StorageBackendException.class) @@ -118,7 +120,7 @@ void shouldPropagateIOException(@Mock final InputStream isMock) throws StorageBa doAnswer(invocation -> { throw new IOException("test"); }).when(isMock).close(); - when(storage.fetch(anyString())) + when(storage.fetch(any())) .thenReturn(isMock); assertThatThrownBy(() -> provider.get(MANIFEST_KEY)) .isInstanceOf(IOException.class) diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java index 9fb048a53..028b87c76 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java @@ -33,6 +33,7 @@ import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.ObjectFetcher; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import org.junit.jupiter.api.BeforeEach; @@ -49,7 +50,7 @@ @ExtendWith(MockitoExtension.class) class FetchChunkEnumerationSourceInputStreamClosingTest { - static final String OBJECT_KEY_PATH = "topic/segment"; + static final ObjectKey OBJECT_KEY = () -> "topic/segment"; static final int CHUNK_SIZE = 10; static final BytesRange RANGE1 = BytesRange.ofFromPositionAndSize(0, CHUNK_SIZE); @@ -79,7 +80,7 @@ void test(final Map config, final ChunkManagerFactory chunkManagerFactory = new ChunkManagerFactory(); chunkManagerFactory.configure(config); final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(fetcher, null); - final var is = new FetchChunkEnumeration(chunkManager, OBJECT_KEY_PATH, SEGMENT_MANIFEST, range) + final var is = new FetchChunkEnumeration(chunkManager, OBJECT_KEY, SEGMENT_MANIFEST, range) .toInputStream(); if (readFully) { is.readAllBytes(); @@ -132,13 +133,13 @@ private static class TestObjectFetcher implements ObjectFetcher { private final List openInputStreams = new ArrayList<>(); @Override - public InputStream fetch(final String key) throws StorageBackendException { + public InputStream fetch(final ObjectKey key) throws StorageBackendException { throw new RuntimeException("Should not be called"); } @Override - public InputStream fetch(final String key, final BytesRange range) { - if (!key.equals(OBJECT_KEY_PATH)) { + public InputStream fetch(final ObjectKey key, final BytesRange range) { + if (!key.equals(OBJECT_KEY)) { throw new IllegalArgumentException("Invalid key: " + key); } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java index c37b9987a..65e331c83 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java @@ -24,7 +24,9 @@ import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1; import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; +import io.aiven.kafka.tieredstorage.storage.TestObjectKey; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -44,7 +46,7 @@ class FetchChunkEnumerationTest { final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); static final byte[] CHUNK_CONTENT = "0123456789".getBytes(); - static final String SEGMENT_KEY_PATH = "topic/segment"; + static final ObjectKey SEGMENT_KEY = new TestObjectKey("topic/segment"); // Test scenarios // - Initialization @@ -58,7 +60,7 @@ void failsWhenLargerStartPosition() { final int to = from + 1; // Then assertThatThrownBy( - () -> new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to))) + () -> new FetchChunkEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to))) .hasMessage("Invalid start position " + from + " in segment path topic/segment"); } @@ -71,7 +73,7 @@ void endPositionIsWithinIndex() { final int to = 80; // Then final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to)); assertThat(fetchChunk.startChunkId).isEqualTo(0); assertThat(fetchChunk.lastChunkId).isEqualTo(8); } @@ -84,7 +86,7 @@ void endPositionIsOutsideIndex() { final int from = 0; final int to = 110; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to)); // Then assertThat(fetchChunk.startChunkId).isEqualTo(0); assertThat(fetchChunk.lastChunkId).isEqualTo(9); @@ -98,8 +100,8 @@ void shouldReturnRangeFromSingleChunk() throws StorageBackendException { final int from = 32; final int to = 34; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, fetchChunk.currentChunkId)) + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to)); + when(chunkManager.getChunk(SEGMENT_KEY, manifest, fetchChunk.currentChunkId)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then assertThat(fetchChunk.startChunkId).isEqualTo(fetchChunk.lastChunkId); @@ -116,12 +118,12 @@ void shouldReturnRangeFromMultipleChunks() throws StorageBackendException { final int from = 15; final int to = 34; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 1)) + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to)); + when(chunkManager.getChunk(SEGMENT_KEY, manifest, 1)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 2)) + when(chunkManager.getChunk(SEGMENT_KEY, manifest, 2)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 3)) + when(chunkManager.getChunk(SEGMENT_KEY, manifest, 3)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then assertThat(fetchChunk.startChunkId).isNotEqualTo(fetchChunk.lastChunkId); diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/KeyNotFoundException.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/KeyNotFoundException.java index 0ed538e24..e622dddfa 100644 --- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/KeyNotFoundException.java +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/KeyNotFoundException.java @@ -17,15 +17,15 @@ package io.aiven.kafka.tieredstorage.storage; public class KeyNotFoundException extends StorageBackendException { - public KeyNotFoundException(final StorageBackend storage, final String key) { + public KeyNotFoundException(final StorageBackend storage, final ObjectKey key) { super(getMessage(storage, key)); } - public KeyNotFoundException(final StorageBackend storage, final String key, final Exception e) { + public KeyNotFoundException(final StorageBackend storage, final ObjectKey key, final Exception e) { super(getMessage(storage, key), e); } - private static String getMessage(final StorageBackend storage, final String key) { + private static String getMessage(final StorageBackend storage, final ObjectKey key) { return "Key " + key + " does not exists in storage " + storage; } } diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectDeleter.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectDeleter.java index 6976cce93..1b0ad1268 100644 --- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectDeleter.java +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectDeleter.java @@ -22,5 +22,5 @@ public interface ObjectDeleter { * *

If the object doesn't exist, the operation still succeeds as it is idempotent. */ - void delete(String key) throws StorageBackendException; + void delete(ObjectKey key) throws StorageBackendException; } diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java index 309745f41..7c672ceeb 100644 --- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java @@ -23,12 +23,12 @@ public interface ObjectFetcher { * Fetch file. * @param key file key. */ - InputStream fetch(String key) throws StorageBackendException; + InputStream fetch(ObjectKey key) throws StorageBackendException; /** * Fetch file. * @param key file key. * @param range range with inclusive start/end positions */ - InputStream fetch(String key, BytesRange range) throws StorageBackendException; + InputStream fetch(ObjectKey key, BytesRange range) throws StorageBackendException; } diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectKey.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectKey.java new file mode 100644 index 000000000..d5374bd2e --- /dev/null +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectKey.java @@ -0,0 +1,21 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage; + +public interface ObjectKey { + String value(); +} diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java index a31366afe..14200861f 100644 --- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java @@ -24,5 +24,5 @@ public interface ObjectUploader { * @param key path to an object within a storage backend. * @return number of bytes uploaded */ - long upload(InputStream inputStream, String key) throws StorageBackendException; + long upload(InputStream inputStream, ObjectKey key) throws StorageBackendException; } diff --git a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java index 9933e897f..8c0861837 100644 --- a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java +++ b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java @@ -28,7 +28,7 @@ public abstract class BaseStorageTest { - protected static final String TOPIC_PARTITION_SEGMENT_KEY = "topic/partition/log"; + protected static final ObjectKey TOPIC_PARTITION_SEGMENT_KEY = new TestObjectKey("topic/partition/log"); protected abstract StorageBackend storage(); @@ -80,10 +80,10 @@ void testRetryUploadKeepLatestVersion() throws StorageBackendException { @Test void testFetchFailWhenNonExistingKey() { - assertThatThrownBy(() -> storage().fetch("non-existing")) + assertThatThrownBy(() -> storage().fetch(new TestObjectKey("non-existing"))) .isInstanceOf(KeyNotFoundException.class) .hasMessage("Key non-existing does not exists in storage " + storage()); - assertThatThrownBy(() -> storage().fetch("non-existing", BytesRange.of(0, 1))) + assertThatThrownBy(() -> storage().fetch(new TestObjectKey("non-existing"), BytesRange.of(0, 1))) .isInstanceOf(KeyNotFoundException.class) .hasMessage("Key non-existing does not exists in storage " + storage()); } @@ -146,10 +146,10 @@ void testFetchWithRangeOutsideFileSize() throws StorageBackendException { @Test void testFetchNonExistingKey() { - assertThatThrownBy(() -> storage().fetch("non-existing")) + assertThatThrownBy(() -> storage().fetch(new TestObjectKey("non-existing"))) .isInstanceOf(KeyNotFoundException.class) .hasMessage("Key non-existing does not exists in storage " + storage()); - assertThatThrownBy(() -> storage().fetch("non-existing", BytesRange.of(0, 1))) + assertThatThrownBy(() -> storage().fetch(new TestObjectKey("non-existing"), BytesRange.of(0, 1))) .isInstanceOf(KeyNotFoundException.class) .hasMessage("Key non-existing does not exists in storage " + storage()); } diff --git a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/TestObjectKey.java b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/TestObjectKey.java new file mode 100644 index 000000000..5e04bb455 --- /dev/null +++ b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/TestObjectKey.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage; + +import java.util.Objects; + +public class TestObjectKey implements ObjectKey { + private final String key; + + public TestObjectKey(final String key) { + this.key = Objects.requireNonNull(key, "key cannot be null"); + } + + @Override + public String value() { + return key; + } + + @Override + public String toString() { + return key; + } +} diff --git a/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java b/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java index 120dbb0b2..5442b18bb 100644 --- a/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java +++ b/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java @@ -27,6 +27,7 @@ import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.InvalidRangeException; import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -47,9 +48,9 @@ public void configure(final Map configs) { } @Override - public long upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final ObjectKey key) throws StorageBackendException { try { - final Path path = fsRoot.resolve(key); + final Path path = fsRoot.resolve(key.value()); Files.createDirectories(path.getParent()); Files.copy(inputStream, path, StandardCopyOption.REPLACE_EXISTING); return Files.size(path); @@ -59,9 +60,9 @@ public long upload(final InputStream inputStream, final String key) throws Stora } @Override - public InputStream fetch(final String key) throws StorageBackendException { + public InputStream fetch(final ObjectKey key) throws StorageBackendException { try { - final Path path = fsRoot.resolve(key); + final Path path = fsRoot.resolve(key.value()); return Files.newInputStream(path); } catch (final NoSuchFileException e) { throw new KeyNotFoundException(this, key, e); @@ -71,9 +72,9 @@ public InputStream fetch(final String key) throws StorageBackendException { } @Override - public InputStream fetch(final String key, final BytesRange range) throws StorageBackendException { + public InputStream fetch(final ObjectKey key, final BytesRange range) throws StorageBackendException { try { - final Path path = fsRoot.resolve(key); + final Path path = fsRoot.resolve(key.value()); final long fileSize = Files.size(path); if (range.from >= fileSize) { throw new InvalidRangeException("Range start position " + range.from @@ -92,9 +93,9 @@ public InputStream fetch(final String key, final BytesRange range) throws Storag } @Override - public void delete(final String key) throws StorageBackendException { + public void delete(final ObjectKey key) throws StorageBackendException { try { - final Path path = fsRoot.resolve(key); + final Path path = fsRoot.resolve(key.value()); Files.deleteIfExists(path); Path parent = path.getParent(); while (parent != null && Files.isDirectory(parent) && !parent.equals(fsRoot) diff --git a/storage/filesystem/src/test/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorageTest.java b/storage/filesystem/src/test/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorageTest.java index 653010052..643fac2e7 100644 --- a/storage/filesystem/src/test/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorageTest.java +++ b/storage/filesystem/src/test/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorageTest.java @@ -71,7 +71,7 @@ void testRootCannotBeNonWritableDirectory() throws IOException { @Test void testDeleteAllParentsButRoot() throws IOException, StorageBackendException { - final Path keyPath = root.resolve(TOPIC_PARTITION_SEGMENT_KEY); + final Path keyPath = root.resolve(TOPIC_PARTITION_SEGMENT_KEY.value()); Files.createDirectories(keyPath.getParent()); Files.writeString(keyPath, "test"); final FileSystemStorage storage = new FileSystemStorage(); @@ -95,7 +95,7 @@ void testDeleteDoesNotRemoveParentDir() throws IOException, StorageBackendExcept Files.writeString(keyPath, "test"); final FileSystemStorage storage = new FileSystemStorage(); storage.configure(Map.of("root", root.toString())); - storage.delete(parent + "/" + key); + storage.delete(() -> parent + "/" + key); assertThat(keyPath).doesNotExist(); assertThat(parentPath).exists(); diff --git a/storage/gcs/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorageMetricsTest.java b/storage/gcs/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorageMetricsTest.java index 1dd9bca03..8397d259a 100644 --- a/storage/gcs/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorageMetricsTest.java +++ b/storage/gcs/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorageMetricsTest.java @@ -27,6 +27,7 @@ import java.util.Map; import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import io.aiven.testcontainers.fakegcsserver.FakeGcsServerContainer; @@ -94,7 +95,7 @@ void setUp(final TestInfo testInfo) { void metricsShouldBeReported() throws StorageBackendException, IOException, JMException { final byte[] data = new byte[RESUMABLE_UPLOAD_CHUNK_SIZE + 1]; - final String key = "x"; + final ObjectKey key = () -> "x"; storage.upload(new ByteArrayInputStream(data), key); try (final InputStream fetch = storage.fetch(key)) { diff --git a/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java b/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java index 12d40037a..198c088ef 100644 --- a/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java +++ b/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java @@ -24,6 +24,7 @@ import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.InvalidRangeException; import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -57,9 +58,9 @@ public void configure(final Map configs) { } @Override - public long upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final ObjectKey key) throws StorageBackendException { try { - final BlobInfo blobInfo = BlobInfo.newBuilder(this.bucketName, key).build(); + final BlobInfo blobInfo = BlobInfo.newBuilder(this.bucketName, key.value()).build(); final Blob blob; if (resumableUploadChunkSize != null) { blob = storage.createFrom(blobInfo, inputStream, resumableUploadChunkSize); @@ -73,16 +74,16 @@ public long upload(final InputStream inputStream, final String key) throws Stora } @Override - public void delete(final String key) throws StorageBackendException { + public void delete(final ObjectKey key) throws StorageBackendException { try { - storage.delete(this.bucketName, key); + storage.delete(this.bucketName, key.value()); } catch (final StorageException e) { throw new StorageBackendException("Failed to delete " + key, e); } } @Override - public InputStream fetch(final String key) throws StorageBackendException { + public InputStream fetch(final ObjectKey key) throws StorageBackendException { try { final Blob blob = getBlob(key); final ReadChannel reader = blob.reader(); @@ -93,7 +94,7 @@ public InputStream fetch(final String key) throws StorageBackendException { } @Override - public InputStream fetch(final String key, final BytesRange range) throws StorageBackendException { + public InputStream fetch(final ObjectKey key, final BytesRange range) throws StorageBackendException { try { final Blob blob = getBlob(key); @@ -118,11 +119,11 @@ public InputStream fetch(final String key, final BytesRange range) throws Storag } } - private Blob getBlob(final String key) throws KeyNotFoundException { + private Blob getBlob(final ObjectKey key) throws KeyNotFoundException { // Unfortunately, it seems Google will do two a separate (HEAD-like) call to get blob metadata. // Since the blobs are immutable in tiered storage, we can consider caching them locally // to avoid the extra round trip. - final Blob blob = storage.get(this.bucketName, key); + final Blob blob = storage.get(this.bucketName, key.value()); if (blob == null) { throw new KeyNotFoundException(this, key); } diff --git a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3ErrorMetricsTest.java b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3ErrorMetricsTest.java index decb99b24..35a577185 100644 --- a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3ErrorMetricsTest.java +++ b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3ErrorMetricsTest.java @@ -84,7 +84,7 @@ void testS3ServerExceptions(final int statusCode, .withHeader(CONTENT_TYPE, APPLICATION_XML.getMimeType()) .withBody(String.format(ERROR_RESPONSE_TEMPLATE, statusCode)))); final S3Exception s3Exception = catchThrowableOfType( - () -> storage.upload(InputStream.nullInputStream(), "key"), + () -> storage.upload(InputStream.nullInputStream(), () -> "key"), S3Exception.class); assertThat(s3Exception.statusCode()).isEqualTo(statusCode); @@ -115,7 +115,7 @@ void testOtherExceptions(final WireMockRuntimeInfo wmRuntimeInfo) throws Excepti .withHeader(CONTENT_TYPE, APPLICATION_XML.getMimeType()) .withBody("unparsable_xml"))); - assertThatThrownBy(() -> storage.upload(InputStream.nullInputStream(), "key")) + assertThatThrownBy(() -> storage.upload(InputStream.nullInputStream(), () -> "key")) .isInstanceOf(SdkClientException.class) .hasMessage("Could not parse XML response."); @@ -140,7 +140,7 @@ void apiCallAttemptTimeout(final WireMockRuntimeInfo wmRuntimeInfo) throws Excep stubFor(any(anyUrl()).willReturn(aResponse().withFixedDelay(100))); - assertThatThrownBy(() -> storage.fetch("key")) + assertThatThrownBy(() -> storage.fetch(() -> "key")) .isInstanceOf(ApiCallAttemptTimeoutException.class) .hasMessage("HTTP request execution did not complete before the specified timeout configuration: 1 millis"); @@ -169,7 +169,7 @@ void ioErrors(final WireMockRuntimeInfo wmRuntimeInfo) throws Exception { .withStatus(HttpStatusCode.OK) .withFault(Fault.RANDOM_DATA_THEN_CLOSE))); - assertThatThrownBy(() -> storage.fetch("key")) + assertThatThrownBy(() -> storage.fetch(() -> "key")) .isInstanceOf(SdkClientException.class) .hasMessage("Unable to execute HTTP request: null"); diff --git a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java index fe1bbf40a..bde27a6cc 100644 --- a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java +++ b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java @@ -26,6 +26,7 @@ import java.util.Map; import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -99,7 +100,7 @@ void setupStorage() { void metricsShouldBeReported() throws Exception { final byte[] data = new byte[PART_SIZE + 1]; - final String key = "x"; + final ObjectKey key = () -> "x"; storage.upload(new ByteArrayInputStream(data), key); try (final InputStream fetch = storage.fetch(key)) { diff --git a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java index 1e2d430ad..f6ae59aa6 100644 --- a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java +++ b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java @@ -90,7 +90,7 @@ protected StorageBackend storage() { @Test void partSizePassedToStream() throws IOException { - try (final var os = ((S3Storage) storage()).s3OutputStream("test")) { + try (final var os = ((S3Storage) storage()).s3OutputStream(() -> "test")) { assertThat(os.partSize).isEqualTo(PART_SIZE); } } diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java index fa71f82dc..af5806698 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.List; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.sync.RequestBody; @@ -52,7 +54,7 @@ public class S3MultiPartOutputStream extends OutputStream { private final S3Client client; private final ByteBuffer partBuffer; private final String bucketName; - private final String key; + private final ObjectKey key; final int partSize; private final String uploadId; @@ -62,7 +64,7 @@ public class S3MultiPartOutputStream extends OutputStream { private long processedBytes; public S3MultiPartOutputStream(final String bucketName, - final String key, + final ObjectKey key, final int partSize, final S3Client client) { this.bucketName = bucketName; @@ -71,7 +73,7 @@ public S3MultiPartOutputStream(final String bucketName, this.partSize = partSize; this.partBuffer = ByteBuffer.allocate(partSize); final CreateMultipartUploadRequest initialRequest = CreateMultipartUploadRequest.builder().bucket(bucketName) - .key(key).build(); + .key(key.value()).build(); final CreateMultipartUploadResponse initiateResult = client.createMultipartUpload(initialRequest); log.debug("Create new multipart upload request: {}", initiateResult.uploadId()); this.uploadId = initiateResult.uploadId(); @@ -147,7 +149,7 @@ private void completeUpload() { .build(); final var request = CompleteMultipartUploadRequest.builder() .bucket(bucketName) - .key(key) + .key(key.value()) .uploadId(uploadId) .multipartUpload(completedMultipartUpload) .build(); @@ -158,7 +160,7 @@ private void completeUpload() { private void abortUpload() { final var request = AbortMultipartUploadRequest.builder() .bucket(bucketName) - .key(key) + .key(key.value()) .uploadId(uploadId) .build(); client.abortMultipartUpload(request); @@ -177,7 +179,7 @@ private void uploadPart(final InputStream in, final int actualPartSize) { final UploadPartRequest uploadPartRequest = UploadPartRequest.builder() .bucket(bucketName) - .key(key) + .key(key.value()) .uploadId(uploadId) .partNumber(partNumber) .build(); diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java index f31ef8a02..5f5d1b2b5 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java @@ -23,6 +23,7 @@ import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.InvalidRangeException; import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -46,7 +47,7 @@ public void configure(final Map configs) { } @Override - public long upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final ObjectKey key) throws StorageBackendException { try (final var out = s3OutputStream(key)) { inputStream.transferTo(out); return out.processedBytes(); @@ -55,14 +56,14 @@ public long upload(final InputStream inputStream, final String key) throws Stora } } - S3MultiPartOutputStream s3OutputStream(final String key) { + S3MultiPartOutputStream s3OutputStream(final ObjectKey key) { return new S3MultiPartOutputStream(bucketName, key, partSize, s3Client); } @Override - public void delete(final String key) throws StorageBackendException { + public void delete(final ObjectKey key) throws StorageBackendException { try { - final var deleteRequest = DeleteObjectRequest.builder().bucket(bucketName).key(key).build(); + final var deleteRequest = DeleteObjectRequest.builder().bucket(bucketName).key(key.value()).build(); s3Client.deleteObject(deleteRequest); } catch (final AwsServiceException e) { throw new StorageBackendException("Failed to delete " + key, e); @@ -70,8 +71,8 @@ public void delete(final String key) throws StorageBackendException { } @Override - public InputStream fetch(final String key) throws StorageBackendException { - final GetObjectRequest getRequest = GetObjectRequest.builder().bucket(bucketName).key(key).build(); + public InputStream fetch(final ObjectKey key) throws StorageBackendException { + final GetObjectRequest getRequest = GetObjectRequest.builder().bucket(bucketName).key(key.value()).build(); try { return s3Client.getObject(getRequest); } catch (final AwsServiceException e) { @@ -84,11 +85,11 @@ public InputStream fetch(final String key) throws StorageBackendException { } @Override - public InputStream fetch(final String key, final BytesRange range) throws StorageBackendException { + public InputStream fetch(final ObjectKey key, final BytesRange range) throws StorageBackendException { try { final GetObjectRequest getRequest = GetObjectRequest.builder() .bucket(bucketName) - .key(key) + .key(key.value()) .range(range.toString()) .build(); return s3Client.getObject(getRequest); diff --git a/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java b/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java index 51c44ebc4..3cdda9a49 100644 --- a/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java +++ b/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Random; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -51,7 +53,7 @@ @ExtendWith(MockitoExtension.class) class S3MultiPartOutputStreamTest { private static final String BUCKET_NAME = "some_bucket"; - private static final String FILE_KEY = "some_key"; + private static final ObjectKey FILE_KEY = () -> "some_key"; private static final String UPLOAD_ID = "some_upload_id"; @Mock @@ -327,7 +329,7 @@ private static void assertUploadPartRequest(final UploadPartRequest uploadPartRe assertThat(uploadPartRequest.uploadId()).isEqualTo(UPLOAD_ID); assertThat(uploadPartRequest.partNumber()).isEqualTo(expectedPartNumber); assertThat(uploadPartRequest.bucket()).isEqualTo(BUCKET_NAME); - assertThat(uploadPartRequest.key()).isEqualTo(FILE_KEY); + assertThat(uploadPartRequest.key()).isEqualTo(FILE_KEY.value()); assertThat(requestBody.optionalContentLength()).hasValue(expectedPartSize); assertThat(requestBody.contentStreamProvider().newStream()).hasBinaryContent(expectedBytes); } @@ -335,14 +337,14 @@ private static void assertUploadPartRequest(final UploadPartRequest uploadPartRe private static void assertCompleteMultipartUploadRequest(final CompleteMultipartUploadRequest request, final List expectedETags) { assertThat(request.bucket()).isEqualTo(BUCKET_NAME); - assertThat(request.key()).isEqualTo(FILE_KEY); + assertThat(request.key()).isEqualTo(FILE_KEY.value()); assertThat(request.uploadId()).isEqualTo(UPLOAD_ID); assertThat(request.multipartUpload().parts()).containsExactlyElementsOf(expectedETags); } private static void assertAbortMultipartUploadRequest(final AbortMultipartUploadRequest request) { assertThat(request.bucket()).isEqualTo(BUCKET_NAME); - assertThat(request.key()).isEqualTo(FILE_KEY); + assertThat(request.key()).isEqualTo(FILE_KEY.value()); assertThat(request.uploadId()).isEqualTo(UPLOAD_ID); } }