diff --git a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index b7fd1f4ef..6c74f2e38 100644 --- a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -480,14 +480,15 @@ void testFetchingSegmentFileNonExistent() throws IOException { ); rsm.configure(config); - final ObjectKey objectKey = new ObjectKey(""); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false); // Ensure the manifest exists. - writeManifest(objectKey); + writeManifest(objectKeyFactory); // Make sure the exception is connected to the log file. - final String expectedMessage = - "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.LOG) + " does not exists in storage"; + final String expectedMessage = "Key " + + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.LOG) + + " does not exists in storage"; assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0)) .isInstanceOf(RemoteResourceNotFoundException.class) @@ -509,9 +510,10 @@ void testFetchingSegmentManifestNotFound() { rsm.configure(config); // Make sure the exception is connected to the manifest file. - final ObjectKey objectKey = new ObjectKey(""); - final String expectedMessage = - "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage"; + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false); + final String expectedMessage = "Key " + + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + + " does not exists in storage"; assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0)) .isInstanceOf(RemoteResourceNotFoundException.class) @@ -533,15 +535,15 @@ void testFetchingIndexNonExistent(final IndexType indexType) throws IOException ); rsm.configure(config); - final ObjectKey objectKey = new ObjectKey(""); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false); // Ensure the manifest exists. - writeManifest(objectKey); + writeManifest(objectKeyFactory); // Make sure the exception is connected to the index file. - final String expectedMessage = - "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.fromIndexType(indexType)) - + " does not exists in storage"; + final String expectedMessage = "Key " + + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.fromIndexType(indexType)) + + " does not exists in storage"; assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType)) .isInstanceOf(RemoteResourceNotFoundException.class) @@ -560,9 +562,10 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage rsm.configure(config); // Make sure the exception is connected to the manifest file. - final ObjectKey objectKey = new ObjectKey(""); - final String expectedMessage = - "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage"; + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("", false); + final String expectedMessage = "Key " + + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + + " does not exists in storage"; assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType)) .isInstanceOf(RemoteResourceNotFoundException.class) @@ -570,14 +573,15 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage .hasStackTraceContaining(expectedMessage); } - private void writeManifest(final ObjectKey objectKey) throws IOException { + private void writeManifest(final ObjectKeyFactory objectKeyFactory) throws IOException { // Ensure the manifest exists. final String manifest = "{\"version\":\"1\"," + "\"chunkIndex\":{\"type\":\"fixed\",\"originalChunkSize\":100," + "\"originalFileSize\":1000,\"transformedChunkSize\":110,\"finalTransformedChunkSize\":110}," + "\"compression\":false}"; - final Path manifestPath = targetDir.resolve(objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST)); + final Path manifestPath = targetDir.resolve( + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value()); Files.createDirectories(manifestPath.getParent()); Files.writeString(manifestPath, manifest); } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKey.java b/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java similarity index 62% rename from core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKey.java rename to core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java index a695217ff..e3ecbb7ec 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKey.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java @@ -19,19 +19,22 @@ import java.text.NumberFormat; import java.util.Map; import java.util.Objects; +import java.util.function.BiFunction; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; + import static io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField.OBJECT_KEY; import static io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField.OBJECT_PREFIX; /** * Maps Kafka segment files to object paths/keys in the storage backend. */ -public final class ObjectKey { +public final class ObjectKeyFactory { /** * Supported files and extensions, including log, index types, and segment manifest. @@ -67,9 +70,17 @@ static Suffix fromIndexType(final RemoteStorageManager.IndexType indexType) { } private final String prefix; + private final BiFunction objectKeyConstructor; - public ObjectKey(final String prefix) { + /** + * @param prefix the prefix to add to all created keys. + * @param maskPrefix whether to mask the prefix in {@code toString()}. + */ + public ObjectKeyFactory(final String prefix, final boolean maskPrefix) { this.prefix = prefix == null ? "" : prefix; + this.objectKeyConstructor = maskPrefix + ? ObjectKeyWithMaskedPrefix::new + : PlainObjectKey::new; } /** @@ -82,15 +93,13 @@ public ObjectKey(final String prefix) { *

For example: * {@code someprefix/topic-MWJ6FHTfRYy67jzwZdeqSQ/7/00000000000000001234-tqimKeZwStOEOwRzT3L5oQ.log} * - * @see ObjectKey#mainPath(RemoteLogSegmentMetadata) + * @see ObjectKeyFactory#mainPath(RemoteLogSegmentMetadata) */ - public String key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final Suffix suffix) { + public ObjectKey key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final Suffix suffix) { Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null"); Objects.requireNonNull(suffix, "suffix cannot be null"); - return prefix - + mainPath(remoteLogSegmentMetadata) - + "." + suffix.value; + return objectKeyConstructor.apply(prefix, mainPath(remoteLogSegmentMetadata) + "." + suffix.value); } /** @@ -103,16 +112,16 @@ public String key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final *

For example: * {@code someprefix/topic-MWJ6FHTfRYy67jzwZdeqSQ/7/00000000000000001234-tqimKeZwStOEOwRzT3L5oQ.log} */ - public String key(final Map fields, - final RemoteLogSegmentMetadata remoteLogSegmentMetadata, - final Suffix suffix) { + public ObjectKey key(final Map fields, + final RemoteLogSegmentMetadata remoteLogSegmentMetadata, + final Suffix suffix) { Objects.requireNonNull(fields, "fields cannot be null"); Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null"); Objects.requireNonNull(suffix, "suffix cannot be null"); final var prefix = (String) fields.getOrDefault(OBJECT_PREFIX.index(), this.prefix); final var main = (String) fields.getOrDefault(OBJECT_KEY.index(), mainPath(remoteLogSegmentMetadata)); - return prefix + main + "." + suffix.value; + return objectKeyConstructor.apply(prefix, main + "." + suffix.value); } /** @@ -150,4 +159,65 @@ private static String filenamePrefixFromOffset(final long offset) { nf.setGroupingUsed(false); return nf.format(offset); } + + /** + * The object key that consists of a prefix and main part + suffix. + * + *

Its string representation is identical to its value. + */ + static class PlainObjectKey implements ObjectKey { + protected final String prefix; + protected final String mainPathAndSuffix; + + PlainObjectKey(final String prefix, final String mainPathAndSuffix) { + this.prefix = Objects.requireNonNull(prefix, "prefix cannot be null"); + this.mainPathAndSuffix = Objects.requireNonNull(mainPathAndSuffix, "mainPathAndSuffix cannot be null"); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PlainObjectKey that = (PlainObjectKey) o; + return Objects.equals(prefix, that.prefix) + && Objects.equals(mainPathAndSuffix, that.mainPathAndSuffix); + } + + @Override + public int hashCode() { + int result = prefix.hashCode(); + result = 31 * result + mainPathAndSuffix.hashCode(); + return result; + } + + @Override + public String value() { + return prefix + mainPathAndSuffix; + } + + @Override + public String toString() { + return value(); + } + } + + /** + * The object key that consists of a prefix and main part + suffix (as the parent class {@link PlainObjectKey}). + * + *

In its string representation, the prefix is masked with {@code /}. + */ + static class ObjectKeyWithMaskedPrefix extends PlainObjectKey { + ObjectKeyWithMaskedPrefix(final String prefix, final String mainPathAndSuffix) { + super(prefix, mainPathAndSuffix); + } + + @Override + public String toString() { + return "/" + mainPathAndSuffix; + } + } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 43fbe2d5b..99a9dee99 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -64,6 +64,7 @@ import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; import io.aiven.kafka.tieredstorage.storage.ObjectDeleter; import io.aiven.kafka.tieredstorage.storage.ObjectFetcher; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.ObjectUploader; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -113,7 +114,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote. private AesEncryptionProvider aesEncryptionProvider; private ObjectMapper mapper; private ChunkManager chunkManager; - private ObjectKey objectKey; + private ObjectKeyFactory objectKeyFactory; private SegmentCustomMetadataSerde customMetadataSerde; private Set customMetadataFields; @@ -138,7 +139,7 @@ public void configure(final Map configs) { .recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG))); metrics = new Metrics(time, metricConfig); setStorage(config.storage()); - objectKey = new ObjectKey(config.keyPrefix()); + objectKeyFactory = new ObjectKeyFactory(config.keyPrefix(), config.keyPrefixMask()); encryptionEnabled = config.encryptionEnabled(); if (encryptionEnabled) { final Map keyRing = new HashMap<>(); @@ -195,7 +196,7 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat log.info("Copying log segment data, metadata: {}", remoteLogSegmentMetadata); final var customMetadataBuilder = - new SegmentCustomMetadataBuilder(customMetadataFields, objectKey, remoteLogSegmentMetadata); + new SegmentCustomMetadataBuilder(customMetadataFields, objectKeyFactory, remoteLogSegmentMetadata); final long startedMs = time.milliseconds(); @@ -286,15 +287,15 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet final TransformFinisher transformFinisher, final SegmentCustomMetadataBuilder customMetadataBuilder) throws IOException, StorageBackendException { - final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG); + final ObjectKey fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG); try (final var sis = transformFinisher.toInputStream()) { final var bytes = uploader.upload(sis, fileKey); metrics.recordObjectUpload( remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), - ObjectKey.Suffix.LOG, + ObjectKeyFactory.Suffix.LOG, bytes ); - customMetadataBuilder.addUploadResult(ObjectKey.Suffix.LOG, bytes); + customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes); log.debug("Uploaded segment log for {}, size: {}", remoteLogSegmentMetadata, bytes); } @@ -316,8 +317,8 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta final TransformFinisher transformFinisher = new TransformFinisher(transformEnum); - final var suffix = ObjectKey.Suffix.fromIndexType(indexType); - final String key = objectKey.key(remoteLogSegmentMetadata, suffix); + final var suffix = ObjectKeyFactory.Suffix.fromIndexType(indexType); + final ObjectKey key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix); try (final var in = transformFinisher.toInputStream()) { final var bytes = uploader.upload(in, key); metrics.recordObjectUpload( @@ -336,16 +337,17 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad final SegmentCustomMetadataBuilder customMetadataBuilder) throws StorageBackendException, IOException { final String manifest = mapper.writeValueAsString(segmentManifest); - final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST); + final ObjectKey manifestObjectKey = + objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST); try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) { - final var bytes = uploader.upload(manifestContent, manifestFileKey); + final var bytes = uploader.upload(manifestContent, manifestObjectKey); metrics.recordObjectUpload( remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), - ObjectKey.Suffix.MANIFEST, + ObjectKeyFactory.Suffix.MANIFEST, bytes ); - customMetadataBuilder.addUploadResult(ObjectKey.Suffix.MANIFEST, bytes); + customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, bytes); log.debug("Uploaded segment manifest for {}, size: {}", remoteLogSegmentMetadata, bytes); } @@ -379,7 +381,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata); - final var suffix = ObjectKey.Suffix.LOG; + final var suffix = ObjectKeyFactory.Suffix.LOG; final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix); return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range) @@ -399,7 +401,7 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata); - final var key = objectKey(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType)); + final var key = objectKey(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.fromIndexType(indexType)); final var in = fetcher.fetch(key); DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in); @@ -420,21 +422,22 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet } } - private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final ObjectKey.Suffix suffix) { - final String segmentKey; + private ObjectKey objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, + final ObjectKeyFactory.Suffix suffix) { + final ObjectKey segmentKey; if (remoteLogSegmentMetadata.customMetadata().isPresent()) { final var customMetadataBytes = remoteLogSegmentMetadata.customMetadata().get(); final var fields = customMetadataSerde.deserialize(customMetadataBytes.value()); - segmentKey = objectKey.key(fields, remoteLogSegmentMetadata, suffix); + segmentKey = objectKeyFactory.key(fields, remoteLogSegmentMetadata, suffix); } else { - segmentKey = objectKey.key(remoteLogSegmentMetadata, suffix); + segmentKey = objectKeyFactory.key(remoteLogSegmentMetadata, suffix); } return segmentKey; } private SegmentManifest fetchSegmentManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws StorageBackendException, IOException { - final String manifestKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST); + final ObjectKey manifestKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST); return segmentManifestProvider.get(manifestKey); } @@ -450,8 +453,8 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment final long startedMs = time.milliseconds(); try { - for (final ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) { - final String key = objectKey.key(remoteLogSegmentMetadata, suffix); + for (final ObjectKeyFactory.Suffix suffix : ObjectKeyFactory.Suffix.values()) { + final ObjectKey key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix); deleter.delete(key); } } catch (final Exception e) { diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java index dde21c86a..ab84d349f 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManager.java @@ -20,11 +20,12 @@ import java.io.InputStream; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; public interface ChunkManager { - InputStream getChunk(final String objectKeyPath, + InputStream getChunk(final ObjectKey objectKey, final SegmentManifest manifest, final int chunkId) throws StorageBackendException, IOException; } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java index 5310476d0..ac219eee5 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java @@ -25,6 +25,7 @@ import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider; import io.aiven.kafka.tieredstorage.storage.ObjectFetcher; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import io.aiven.kafka.tieredstorage.transform.BaseDetransformChunkEnumeration; import io.aiven.kafka.tieredstorage.transform.DecompressionChunkEnumeration; @@ -46,11 +47,11 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi * * @return an {@link InputStream} of the chunk, plain text (i.e., decrypted and decompressed). */ - public InputStream getChunk(final String objectKeyPath, final SegmentManifest manifest, + public InputStream getChunk(final ObjectKey objectKey, final SegmentManifest manifest, final int chunkId) throws StorageBackendException { final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId); - final InputStream chunkContent = fetcher.fetch(objectKeyPath, chunk.range()); + final InputStream chunkContent = fetcher.fetch(objectKey, chunk.range()); DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk)); final Optional encryptionMetadata = manifest.encryption(); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java index bea267398..23f39bf00 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java @@ -33,6 +33,7 @@ import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import com.github.benmanes.caffeine.cache.AsyncCache; @@ -65,10 +66,10 @@ protected ChunkCache(final ChunkManager chunkManager) { * opened right when fetching from cache happens even if the actual value is removed from the cache, * the InputStream will still contain the data. */ - public InputStream getChunk(final String objectKeyPath, + public InputStream getChunk(final ObjectKey objectKey, final SegmentManifest manifest, final int chunkId) throws StorageBackendException, IOException { - final ChunkKey chunkKey = new ChunkKey(objectKeyPath, chunkId); + final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId); final AtomicReference result = new AtomicReference<>(); try { return cache.asMap() @@ -77,7 +78,7 @@ public InputStream getChunk(final String objectKeyPath, statsCounter.recordMiss(); try { final InputStream chunk = - chunkManager.getChunk(objectKeyPath, manifest, chunkId); + chunkManager.getChunk(objectKey, manifest, chunkId); final T t = this.cacheChunk(chunkKey, chunk); result.getAndSet(cachedChunkToInputStream(t)); return t; diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java index 5e1c9e4ad..cf295ba72 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java @@ -48,6 +48,9 @@ public class RemoteStorageManagerConfig extends AbstractConfig { private static final String OBJECT_KEY_PREFIX_CONFIG = "key.prefix"; private static final String OBJECT_KEY_PREFIX_DOC = "The object storage path prefix"; + private static final String OBJECT_KEY_PREFIX_MASK_CONFIG = "key.prefix.mask"; + private static final String OBJECT_KEY_PREFIX_MASK_DOC = "Whether to mask path prefix in logs"; + private static final String SEGMENT_MANIFEST_CACHE_PREFIX = "segment.manifest.cache."; private static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size"; private static final Long SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT = 1000L; // TODO consider a better default @@ -115,6 +118,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig { OBJECT_KEY_PREFIX_DOC ); + CONFIG.define( + OBJECT_KEY_PREFIX_MASK_CONFIG, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + OBJECT_KEY_PREFIX_MASK_DOC + ); + CONFIG.define( SEGMENT_MANIFEST_CACHE_SIZE_CONFIG, ConfigDef.Type.LONG, @@ -335,6 +346,10 @@ public String keyPrefix() { return getString(OBJECT_KEY_PREFIX_CONFIG); } + public boolean keyPrefixMask() { + return getBoolean(OBJECT_KEY_PREFIX_MASK_CONFIG); + } + public int chunkSize() { return getInt(CHUNK_SIZE_CONFIG); } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java index af2a76d5d..c8f8eb885 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java @@ -27,6 +27,7 @@ import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter; import io.aiven.kafka.tieredstorage.storage.ObjectFetcher; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -37,7 +38,7 @@ public class SegmentManifestProvider { private static final String SEGMENT_MANIFEST_METRIC_GROUP_NAME = "segment-manifest-cache"; private static final long GET_TIMEOUT_SEC = 10; - private final AsyncLoadingCache cache; + private final AsyncLoadingCache cache; /** * @param maxCacheSize the max cache size (in items) or empty if the cache is unbounded. @@ -62,7 +63,7 @@ public SegmentManifestProvider(final Optional maxCacheSize, statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize); } - public SegmentManifest get(final String manifestKey) + public SegmentManifest get(final ObjectKey manifestKey) throws StorageBackendException, IOException { try { return cache.get(manifestKey).get(GET_TIMEOUT_SEC, TimeUnit.SECONDS); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java index b6176f7b7..88841e53e 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java @@ -23,25 +23,25 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; -import io.aiven.kafka.tieredstorage.ObjectKey; +import io.aiven.kafka.tieredstorage.ObjectKeyFactory; public class SegmentCustomMetadataBuilder { - final ObjectKey objectKey; + final ObjectKeyFactory objectKeyFactory; final RemoteLogSegmentMetadata segmentMetadata; - final EnumMap uploadResults; + final EnumMap uploadResults; final Set fields; public SegmentCustomMetadataBuilder(final Set fields, - final ObjectKey objectKey, + final ObjectKeyFactory objectKeyFactory, final RemoteLogSegmentMetadata segmentMetadata) { this.fields = fields; - this.objectKey = objectKey; + this.objectKeyFactory = objectKeyFactory; this.segmentMetadata = segmentMetadata; - this.uploadResults = new EnumMap<>(ObjectKey.Suffix.class); + this.uploadResults = new EnumMap<>(ObjectKeyFactory.Suffix.class); } - public SegmentCustomMetadataBuilder addUploadResult(final ObjectKey.Suffix suffix, + public SegmentCustomMetadataBuilder addUploadResult(final ObjectKeyFactory.Suffix suffix, final long bytes) { if (uploadResults.containsKey(suffix)) { throw new IllegalArgumentException("Upload results for suffix " + suffix + " already added"); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java index 284cd8bd6..6e63bf0d3 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java @@ -24,13 +24,13 @@ import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Type; -import io.aiven.kafka.tieredstorage.ObjectKey; +import io.aiven.kafka.tieredstorage.ObjectKeyFactory; // index define values on custom metadata fields and cannot be changed without breaking compatibility. public enum SegmentCustomMetadataField { REMOTE_SIZE(0, new Field("remote_size", Type.VARLONG), SegmentCustomMetadataBuilder::totalSize), - OBJECT_PREFIX(1, new Field("object_prefix", Type.COMPACT_STRING), b -> b.objectKey.prefix()), - OBJECT_KEY(2, new Field("object_key", Type.COMPACT_STRING), b -> ObjectKey.mainPath(b.segmentMetadata)); + OBJECT_PREFIX(1, new Field("object_prefix", Type.COMPACT_STRING), b -> b.objectKeyFactory.prefix()), + OBJECT_KEY(2, new Field("object_key", Type.COMPACT_STRING), b -> ObjectKeyFactory.mainPath(b.segmentMetadata)); static final TaggedFieldsSection FIELDS_SECTION = TaggedFieldsSection.of( REMOTE_SIZE.index, REMOTE_SIZE.field, diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java index 63cbe306e..177d5885a 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java @@ -30,7 +30,7 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.utils.Time; -import io.aiven.kafka.tieredstorage.ObjectKey; +import io.aiven.kafka.tieredstorage.ObjectKeyFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,14 +210,14 @@ private void recordSegmentFetchRequestedBytes(final TopicPartition topicPartitio .record(bytes); } - public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKey.Suffix suffix, + public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKeyFactory.Suffix suffix, final long bytes) { recordObjectUploadRequests(topicPartition, suffix); recordObjectUploadBytes(topicPartition, suffix, bytes); } private void recordObjectUploadBytes(final TopicPartition topicPartition, - final ObjectKey.Suffix suffix, + final ObjectKeyFactory.Suffix suffix, final long bytes) { new SensorProvider(metrics, sensorName(OBJECT_UPLOAD_BYTES)) .with(metricsRegistry.objectUploadBytesRate, new Rate()) @@ -258,7 +258,7 @@ private void recordObjectUploadBytes(final TopicPartition topicPartition, .record(bytes); } - private void recordObjectUploadRequests(final TopicPartition topicPartition, final ObjectKey.Suffix suffix) { + private void recordObjectUploadRequests(final TopicPartition topicPartition, final ObjectKeyFactory.Suffix suffix) { new SensorProvider(metrics, sensorName(OBJECT_UPLOAD)) .with(metricsRegistry.objectUploadRequestsRate, new Rate()) .with(metricsRegistry.objectUploadRequestsTotal, new CumulativeCount()) diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java index f321ac415..ee3265f6b 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.TopicPartition; -import io.aiven.kafka.tieredstorage.ObjectKey; +import io.aiven.kafka.tieredstorage.ObjectKeyFactory; public class MetricsRegistry { @@ -190,7 +190,7 @@ public static String sensorName(final String name) { return name; } - public static String sensorNameByObjectType(final ObjectKey.Suffix suffix, final String name) { + public static String sensorNameByObjectType(final ObjectKeyFactory.Suffix suffix, final String name) { return TAG_NAME_OBJECT_TYPE + "." + suffix.value + "." + name; } @@ -199,7 +199,7 @@ public static String sensorNameByTopic(final TopicPartition topicPartition, fina } public static String sensorNameByTopicAndObjectType(final TopicPartition topicPartition, - final ObjectKey.Suffix suffix, + final ObjectKeyFactory.Suffix suffix, final String name) { return TAG_NAME_TOPIC + "." + topicPartition.topic() + "." + TAG_NAME_OBJECT_TYPE + "." + suffix.value @@ -213,7 +213,7 @@ public static String sensorNameByTopicPartition(final TopicPartition topicPartit } public static String sensorNameByTopicPartitionAndObjectType(final TopicPartition topicPartition, - final ObjectKey.Suffix suffix, + final ObjectKeyFactory.Suffix suffix, final String name) { return TAG_NAME_TOPIC + "." + topicPartition.topic() + "." + TAG_NAME_PARTITION + "." + topicPartition.partition() @@ -226,7 +226,7 @@ static Map topicTags(final TopicPartition topicPartition) { } static Map topicAndObjectTypeTags(final TopicPartition topicPartition, - final ObjectKey.Suffix suffix) { + final ObjectKeyFactory.Suffix suffix) { return Map.of( TAG_NAME_TOPIC, topicPartition.topic(), TAG_NAME_OBJECT_TYPE, suffix.value @@ -241,7 +241,7 @@ static Map topicPartitionTags(final TopicPartition topicPartitio } static Map topicPartitionAndObjectTypeTags(final TopicPartition topicPartition, - final ObjectKey.Suffix suffix) { + final ObjectKeyFactory.Suffix suffix) { return Map.of( TAG_NAME_TOPIC, topicPartition.topic(), TAG_NAME_PARTITION, String.valueOf(topicPartition.partition()), @@ -249,7 +249,7 @@ static Map topicPartitionAndObjectTypeTags(final TopicPartition ); } - static Map objectTypeTags(final ObjectKey.Suffix suffix) { + static Map objectTypeTags(final ObjectKeyFactory.Suffix suffix) { return Map.of(TAG_NAME_OBJECT_TYPE, suffix.value); } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java index 9531b0ca0..922a1574d 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumeration.java @@ -30,13 +30,14 @@ import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import org.apache.commons.io.input.BoundedInputStream; public class FetchChunkEnumeration implements Enumeration { private final ChunkManager chunkManager; - private final String objectKeyPath; + private final ObjectKey objectKey; private final SegmentManifest manifest; private final BytesRange range; final int startChunkId; @@ -48,16 +49,16 @@ public class FetchChunkEnumeration implements Enumeration { /** * * @param chunkManager provides chunk input to fetch from - * @param objectKeyPath required by chunkManager + * @param objectKey required by chunkManager * @param manifest provides to index to build response from * @param range original offset range start/end position */ public FetchChunkEnumeration(final ChunkManager chunkManager, - final String objectKeyPath, + final ObjectKey objectKey, final SegmentManifest manifest, final BytesRange range) { this.chunkManager = Objects.requireNonNull(chunkManager, "chunkManager cannot be null"); - this.objectKeyPath = Objects.requireNonNull(objectKeyPath, "objectKeyPath cannot be null"); + this.objectKey = Objects.requireNonNull(objectKey, "objectKey cannot be null"); this.manifest = Objects.requireNonNull(manifest, "manifest cannot be null"); this.range = Objects.requireNonNull(range, "range cannot be null"); @@ -74,7 +75,7 @@ private Chunk getFirstChunk(final int fromPosition) { final Chunk firstChunk = chunkIndex.findChunkForOriginalOffset(fromPosition); if (firstChunk == null) { throw new IllegalArgumentException("Invalid start position " + fromPosition - + " in segment path " + objectKeyPath); + + " in segment path " + objectKey); } return firstChunk; } @@ -137,7 +138,7 @@ public InputStream nextElement() { private InputStream getChunkContent(final int chunkId) { try { - return chunkManager.getChunk(objectKeyPath, manifest, chunkId); + return chunkManager.getChunk(objectKey, manifest, chunkId); } catch (final KeyNotFoundException e) { throw new KeyNotFoundRuntimeException(e); } catch (final StorageBackendException | IOException e) { diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyEqualsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyEqualsTest.java new file mode 100644 index 000000000..2395d8dbd --- /dev/null +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyEqualsTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ObjectKeyEqualsTest { + @ParameterizedTest + @ValueSource(classes = {ObjectKeyFactory.PlainObjectKey.class, ObjectKeyFactory.ObjectKeyWithMaskedPrefix.class}) + void identical(final Class keyClass) throws ReflectiveOperationException { + final var constructor = keyClass.getDeclaredConstructor(String.class, String.class); + final var k1 = constructor.newInstance("prefix", "mainPathAndSuffix"); + final var k2 = constructor.newInstance("prefix", "mainPathAndSuffix"); + assertThat(k1).isEqualTo(k2); + assertThat(k2).isEqualTo(k1); + assertThat(k1).hasSameHashCodeAs(k2); + } + + @ParameterizedTest + @ValueSource(classes = {ObjectKeyFactory.PlainObjectKey.class, ObjectKeyFactory.ObjectKeyWithMaskedPrefix.class}) + void differentPrefix(final Class keyClass) throws ReflectiveOperationException { + final var constructor = keyClass.getDeclaredConstructor(String.class, String.class); + final var k1 = constructor.newInstance("prefix1", "mainPathAndSuffix"); + final var k2 = constructor.newInstance("prefix2", "mainPathAndSuffix"); + assertThat(k1).isNotEqualTo(k2); + assertThat(k2).isNotEqualTo(k1); + assertThat(k1).doesNotHaveSameHashCodeAs(k2); + } + + @ParameterizedTest + @ValueSource(classes = {ObjectKeyFactory.PlainObjectKey.class, ObjectKeyFactory.ObjectKeyWithMaskedPrefix.class}) + void differentMainPathAndSuffix(final Class keyClass) + throws ReflectiveOperationException { + final var constructor = keyClass.getDeclaredConstructor(String.class, String.class); + final var k1 = constructor.newInstance("prefix", "mainPathAndSuffix1"); + final var k2 = constructor.newInstance("prefix", "mainPathAndSuffix2"); + assertThat(k1).isNotEqualTo(k2); + assertThat(k2).isNotEqualTo(k1); + assertThat(k1).doesNotHaveSameHashCodeAs(k2); + } +} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java new file mode 100644 index 000000000..008aa7afc --- /dev/null +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java @@ -0,0 +1,299 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage; + +import java.util.Map; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; + +import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class ObjectKeyFactoryTest { + static final Uuid TOPIC_ID = Uuid.METADATA_TOPIC_ID; // string representation: AAAAAAAAAAAAAAAAAAAAAQ + static final Uuid SEGMENT_ID = Uuid.ZERO_UUID; // string representation: AAAAAAAAAAAAAAAAAAAAAA + static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(TOPIC_ID, new TopicPartition("topic", 7)); + static final RemoteLogSegmentId REMOTE_LOG_SEGMENT_ID = new RemoteLogSegmentId(TOPIC_ID_PARTITION, SEGMENT_ID); + static final RemoteLogSegmentMetadata REMOTE_LOG_SEGMENT_METADATA = new RemoteLogSegmentMetadata( + REMOTE_LOG_SEGMENT_ID, 1234L, 2000L, + 0, 0, 0, 0, Map.of(0, 0L)); + + @Test + void test() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX).value()) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX).value()) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT).value()) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX).value()) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .value()) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value()) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); + } + + @Test + void withCustomFieldsEmpty() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false); + final Map fields = Map.of(); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value() + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX).value() + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX).value() + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT).value() + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX).value() + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .value() + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value() + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); + } + + @Test + void withCustomFieldsOnlyPrefix() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false); + final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value() + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX).value() + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX).value() + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT).value() + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX).value() + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .value() + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value() + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); + } + + @Test + void withCustomFieldsOnlyKey() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false); + final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value() + ).isEqualTo("prefix/topic/7/file.log"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX).value() + ).isEqualTo("prefix/topic/7/file.index"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX).value() + ).isEqualTo("prefix/topic/7/file.timeindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT).value() + ).isEqualTo("prefix/topic/7/file.snapshot"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX).value() + ).isEqualTo("prefix/topic/7/file.txnindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .value() + ).isEqualTo("prefix/topic/7/file.leader-epoch-checkpoint"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value() + ).isEqualTo("prefix/topic/7/file.rsm-manifest"); + } + + @Test + void withCustomFieldsAll() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", false); + final Map fields = Map.of( + SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/", + SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value() + ).isEqualTo("other/topic/7/file.log"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX).value() + ).isEqualTo("other/topic/7/file.index"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX).value() + ).isEqualTo("other/topic/7/file.timeindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT).value() + ).isEqualTo("other/topic/7/file.snapshot"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX).value() + ).isEqualTo("other/topic/7/file.txnindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .value() + ).isEqualTo("other/topic/7/file.leader-epoch-checkpoint"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST).value() + ).isEqualTo("other/topic/7/file.rsm-manifest"); + } + + @Test + void nullPrefix() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(null, false); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG).value()) + .isEqualTo( + "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + } + + @Test + void suffixForIndexTypes() { + assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.OFFSET)) + .isEqualTo(ObjectKeyFactory.Suffix.OFFSET_INDEX) + .extracting("value") + .isEqualTo("index"); + assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.TIMESTAMP)) + .isEqualTo(ObjectKeyFactory.Suffix.TIME_INDEX) + .extracting("value") + .isEqualTo("timeindex"); + assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT)) + .isEqualTo(ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT) + .extracting("value") + .isEqualTo("snapshot"); + assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.TRANSACTION)) + .isEqualTo(ObjectKeyFactory.Suffix.TXN_INDEX) + .extracting("value") + .isEqualTo("txnindex"); + assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.LEADER_EPOCH)) + .isEqualTo(ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .extracting("value") + .isEqualTo("leader-epoch-checkpoint"); + } + + @Test + void prefixMasking() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/", true); + assertThat( + objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + ).hasToString( + "/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + } + + @Test + void prefixMaskingWithCustomFieldsEmpty() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true); + final Map fields = Map.of(); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + ).hasToString( + "/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + } + + @Test + void prefixMaskingWithCustomFieldsOnlyPrefix() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true); + final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + ).hasToString( + "/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + } + + @Test + void prefixMaskingWithCustomFieldsOnlyKey() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true); + final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + ).hasToString("/topic/7/file.log"); + } + + @Test + void prefixMaskingWithCustomFieldsAll() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("real-prefix/", true); + final Map fields = Map.of( + SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/", + SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + ).hasToString("/topic/7/file.log"); + } +} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyTest.java deleted file mode 100644 index 4ed07a959..000000000 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyTest.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright 2023 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.tieredstorage; - -import java.util.Map; - -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; - -import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField; - -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -class ObjectKeyTest { - static final Uuid TOPIC_ID = Uuid.METADATA_TOPIC_ID; // string representation: AAAAAAAAAAAAAAAAAAAAAQ - static final Uuid SEGMENT_ID = Uuid.ZERO_UUID; // string representation: AAAAAAAAAAAAAAAAAAAAAA - static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(TOPIC_ID, new TopicPartition("topic", 7)); - static final RemoteLogSegmentId REMOTE_LOG_SEGMENT_ID = new RemoteLogSegmentId(TOPIC_ID_PARTITION, SEGMENT_ID); - static final RemoteLogSegmentMetadata REMOTE_LOG_SEGMENT_METADATA = new RemoteLogSegmentMetadata( - REMOTE_LOG_SEGMENT_ID, 1234L, 2000L, - 0, 0, 0, 0, Map.of(0, 0L)); - - @Test - void test() { - final ObjectKey objectKey = new ObjectKey("prefix/"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); - } - - @Test - void withCustomFieldsEmpty() { - final ObjectKey objectKey = new ObjectKey("prefix/"); - final Map fields = Map.of(); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); - } - - @Test - void withCustomFieldsOnlyPrefix() { - final ObjectKey objectKey = new ObjectKey("prefix/"); - final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); - } - - @Test - void withCustomFieldsOnlyKey() { - final ObjectKey objectKey = new ObjectKey("prefix/"); - final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo("prefix/topic/7/file.log"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX)) - .isEqualTo("prefix/topic/7/file.index"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX)) - .isEqualTo("prefix/topic/7/file.timeindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT)) - .isEqualTo("prefix/topic/7/file.snapshot"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX)) - .isEqualTo("prefix/topic/7/file.txnindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT)) - .isEqualTo("prefix/topic/7/file.leader-epoch-checkpoint"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST)) - .isEqualTo("prefix/topic/7/file.rsm-manifest"); - } - - @Test - void withCustomFieldsAll() { - final ObjectKey objectKey = new ObjectKey("prefix/"); - final Map fields = Map.of( - SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/", - SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo("other/topic/7/file.log"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX)) - .isEqualTo("other/topic/7/file.index"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX)) - .isEqualTo("other/topic/7/file.timeindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT)) - .isEqualTo("other/topic/7/file.snapshot"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX)) - .isEqualTo("other/topic/7/file.txnindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT)) - .isEqualTo("other/topic/7/file.leader-epoch-checkpoint"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST)) - .isEqualTo("other/topic/7/file.rsm-manifest"); - } - - @Test - void nullPrefix() { - final ObjectKey objectKey = new ObjectKey(null); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo( - "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); - } - - @Test - void suffixForIndexTypes() { - assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.OFFSET)) - .isEqualTo(ObjectKey.Suffix.OFFSET_INDEX) - .extracting("value") - .isEqualTo("index"); - assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.TIMESTAMP)) - .isEqualTo(ObjectKey.Suffix.TIME_INDEX) - .extracting("value") - .isEqualTo("timeindex"); - assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT)) - .isEqualTo(ObjectKey.Suffix.PRODUCER_SNAPSHOT) - .extracting("value") - .isEqualTo("snapshot"); - assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.TRANSACTION)) - .isEqualTo(ObjectKey.Suffix.TXN_INDEX) - .extracting("value") - .isEqualTo("txnindex"); - assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.LEADER_EPOCH)) - .isEqualTo(ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT) - .extracting("value") - .isEqualTo("leader-epoch-checkpoint"); - } -} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java index 5edc4a817..0e55083db 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java @@ -143,7 +143,7 @@ void metricsShouldBeReported(final String tags) throws RemoteStorageException, J assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-bytes-rate")) .isEqualTo(1575.0 / METRIC_TIME_WINDOW_SEC); - for (final var suffix : ObjectKey.Suffix.values()) { + for (final var suffix : ObjectKeyFactory.Suffix.values()) { final ObjectName storageMetricsName = ObjectName.getInstance(objectName + ",object-type=" + suffix.value); switch (suffix) { case TXN_INDEX: diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java index 6335a8763..0d58aa94d 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java @@ -27,6 +27,7 @@ import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider; import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import com.github.luben.zstd.ZstdCompressCtx; @@ -42,7 +43,7 @@ @ExtendWith(MockitoExtension.class) class DefaultChunkManagerTest extends AesKeyAwareTest { - static final String OBJECT_KEY_PATH = "topic/segment.log"; + static final ObjectKey OBJECT_KEY = () -> "topic/segment.log"; static final byte[] TEST_CHUNK_CONTENT = "0123456789".getBytes(); @Mock private StorageBackend storage; @@ -53,11 +54,11 @@ void testGetChunk() throws Exception { final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); - when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())) + when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream("0123456789".getBytes())); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasContent("0123456789"); - verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); + assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasContent("0123456789"); + verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); } @Test @@ -73,15 +74,15 @@ void testGetChunkWithEncryption() throws Exception { final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, encrypted.length, encrypted.length); - when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())).thenReturn( + when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range())).thenReturn( new ByteArrayInputStream(encrypted)); final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad), null); final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); - verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); + assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); + verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); } @Test @@ -94,13 +95,13 @@ void testGetChunkWithCompression() throws Exception { } final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, compressed.length, compressed.length); - when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())) + when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream(compressed)); final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, true, null, null); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); - assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); - verify(storage).fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range()); + assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); + verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java index 76e3ea4d4..d5460edc8 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java @@ -27,6 +27,7 @@ import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; @@ -49,7 +50,7 @@ class ChunkCacheMetricsTest { static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer(); - public static final String OBJECT_KEY_PATH = "topic/segment"; + public static final ObjectKey OBJECT_KEY_PATH = () -> "topic/segment"; @TempDir static Path baseCachePath; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java index f759895b7..d4e2db78a 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java @@ -28,6 +28,7 @@ import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1; import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import com.github.benmanes.caffeine.cache.RemovalCause; @@ -68,6 +69,8 @@ class ChunkCacheTest { new SegmentManifestV1(FIXED_SIZE_CHUNK_INDEX, false, null, null); private static final String TEST_EXCEPTION_MESSAGE = "test_message"; private static final String SEGMENT_KEY = "topic/segment"; + private static final ObjectKey SEGMENT_OBJECT_KEY = () -> SEGMENT_KEY; + @Mock private ChunkManager chunkManager; private ChunkCache chunkCache; @@ -90,9 +93,9 @@ class CacheTests { @BeforeEach void setUp() throws Exception { doAnswer(invocation -> removalListener).when(chunkCache).removalListener(); - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0)); - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1)); } @@ -103,17 +106,17 @@ void noEviction() throws IOException, StorageBackendException { "size", "-1" )); - final InputStream chunk0 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + final InputStream chunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); assertThat(chunk0).hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); + final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); assertThat(cachedChunk0).hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); - final InputStream chunk1 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + final InputStream chunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); assertThat(chunk1).hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); - final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); + final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); assertThat(cachedChunk1).hasBinaryContent(CHUNK_1); verifyNoMoreInteractions(chunkManager); @@ -127,19 +130,19 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte "size", "-1" )); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); Thread.sleep(100); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .hasBinaryContent(CHUNK_1); verifyNoMoreInteractions(chunkManager); @@ -152,9 +155,9 @@ void timeBasedEviction() throws IOException, StorageBackendException, Interrupte any(), eq(RemovalCause.EXPIRED)); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); - verify(chunkManager, times(2)).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + verify(chunkManager, times(2)).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); } @Test @@ -164,16 +167,16 @@ void sizeBasedEviction() throws IOException, StorageBackendException { "size", "18" )); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); verifyNoMoreInteractions(chunkManager); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1); + verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); await().atMost(Duration.ofMillis(5000)) .pollDelay(Duration.ofSeconds(2)) @@ -182,11 +185,11 @@ void sizeBasedEviction() throws IOException, StorageBackendException { verify(removalListener).onRemoval(any(ChunkKey.class), any(), eq(RemovalCause.SIZE)); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .hasBinaryContent(CHUNK_0); - assertThat(chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .hasBinaryContent(CHUNK_1); - verify(chunkManager, times(3)).getChunk(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), anyInt()); + verify(chunkManager, times(3)).getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt()); } } @@ -205,32 +208,32 @@ void setUp() { @Test void failedFetching() throws Exception { - when(chunkManager.getChunk(eq(SEGMENT_KEY), eq(SEGMENT_MANIFEST), anyInt())) + when(chunkManager.getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt())) .thenThrow(new StorageBackendException(TEST_EXCEPTION_MESSAGE)) .thenThrow(new IOException(TEST_EXCEPTION_MESSAGE)); assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .isInstanceOf(StorageBackendException.class) .hasMessage(TEST_EXCEPTION_MESSAGE); assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 1)) + .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) .isInstanceOf(IOException.class) .hasMessage(TEST_EXCEPTION_MESSAGE); } @Test void failedReadingCachedValueWithInterruptedException() throws Exception { - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .thenReturn(new ByteArrayInputStream(CHUNK_0)); doCallRealMethod().doAnswer(invocation -> { throw new InterruptedException(TEST_EXCEPTION_MESSAGE); }).when(chunkCache).cachedChunkToInputStream(any()); - chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .isInstanceOf(RuntimeException.class) .hasCauseInstanceOf(ExecutionException.class) .hasRootCauseInstanceOf(InterruptedException.class) @@ -239,15 +242,15 @@ void failedReadingCachedValueWithInterruptedException() throws Exception { @Test void failedReadingCachedValueWithExecutionException() throws Exception { - when(chunkManager.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)).thenReturn( + when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)).thenReturn( new ByteArrayInputStream(CHUNK_0)); doCallRealMethod().doAnswer(invocation -> { throw new ExecutionException(new RuntimeException(TEST_EXCEPTION_MESSAGE)); }).when(chunkCache).cachedChunkToInputStream(any()); - chunkCache.getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0); + chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_KEY, SEGMENT_MANIFEST, 0)) + .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) .isInstanceOf(RuntimeException.class) .hasCauseInstanceOf(ExecutionException.class) .hasRootCauseInstanceOf(RuntimeException.class) diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/config/NoopStorageBackend.java b/core/src/test/java/io/aiven/kafka/tieredstorage/config/NoopStorageBackend.java index 9b255b743..f353f4566 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/config/NoopStorageBackend.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/config/NoopStorageBackend.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef; import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -37,22 +38,22 @@ public void configure(final Map configs) { } @Override - public long upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final ObjectKey key) throws StorageBackendException { return 0; } @Override - public InputStream fetch(final String key) throws StorageBackendException { + public InputStream fetch(final ObjectKey key) throws StorageBackendException { return null; } @Override - public InputStream fetch(final String key, final BytesRange range) throws StorageBackendException { + public InputStream fetch(final ObjectKey key, final BytesRange range) throws StorageBackendException { return null; } @Override - public void delete(final String key) throws StorageBackendException { + public void delete(final ObjectKey key) throws StorageBackendException { } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java index 3b3cd86d9..74b78aa69 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java @@ -51,6 +51,7 @@ void minimalConfig() { assertThat(config.encryptionKeyPairId()).isNull(); assertThat(config.encryptionKeyRing()).isNull(); assertThat(config.keyPrefix()).isEmpty(); + assertThat(config.keyPrefixMask()).isFalse(); assertThat(config.customMetadataKeysIncluded()).isEmpty(); } @@ -338,4 +339,16 @@ void invalidCustomMetadataFields() { .hasMessage("Invalid value unknown for configuration custom.metadata.fields.include: " + "String must be one of: REMOTE_SIZE, OBJECT_PREFIX, OBJECT_KEY"); } + + @Test + void keyPrefixMasking() { + final var config = new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "123", + "key.prefix.mask", "true" + ) + ); + assertThat(config.keyPrefixMask()).isTrue(); + } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java index b81e3fdd6..059f78157 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java @@ -25,6 +25,7 @@ import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; import io.aiven.kafka.tieredstorage.manifest.serde.KafkaTypeSerdeModule; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -39,7 +40,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -48,7 +49,7 @@ @ExtendWith(MockitoExtension.class) class SegmentManifestProviderTest { static final ObjectMapper MAPPER = new ObjectMapper(); - public static final String MANIFEST_KEY = "topic/manifest"; + public static final ObjectKey MANIFEST_KEY = () -> "topic/manifest"; static { MAPPER.registerModule(new Jdk8Module()); @@ -91,7 +92,8 @@ void withoutRetentionLimitsShouldBeCreated() { @Test void shouldReturnAndCache() throws StorageBackendException, IOException { - final String key = "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000000023-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"; + final ObjectKey key = + () -> "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000000023-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"; when(storage.fetch(key)) .thenReturn(new ByteArrayInputStream(MANIFEST.getBytes())); final SegmentManifestV1 expectedManifest = new SegmentManifestV1( @@ -106,7 +108,7 @@ void shouldReturnAndCache() throws StorageBackendException, IOException { @Test void shouldPropagateStorageBackendException() throws StorageBackendException { - when(storage.fetch(anyString())) + when(storage.fetch(any())) .thenThrow(new StorageBackendException("test")); assertThatThrownBy(() -> provider.get(MANIFEST_KEY)) .isInstanceOf(StorageBackendException.class) @@ -118,7 +120,7 @@ void shouldPropagateIOException(@Mock final InputStream isMock) throws StorageBa doAnswer(invocation -> { throw new IOException("test"); }).when(isMock).close(); - when(storage.fetch(anyString())) + when(storage.fetch(any())) .thenReturn(isMock); assertThatThrownBy(() -> provider.get(MANIFEST_KEY)) .isInstanceOf(IOException.class) diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java index 74bc9ec99..7a1ed0dcc 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java @@ -25,7 +25,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; -import io.aiven.kafka.tieredstorage.ObjectKey; +import io.aiven.kafka.tieredstorage.ObjectKeyFactory; import org.junit.jupiter.api.Test; @@ -43,24 +43,24 @@ class SegmentCustomMetadataBuilderTest { SEGMENT_ID), 1, 100, -1, -1, 1L, 100, Collections.singletonMap(1, 100L)); - static final ObjectKey OBJECT_KEY = new ObjectKey("p1"); + static final ObjectKeyFactory OBJECT_KEY_FACTORY = new ObjectKeyFactory("p1", false); @Test void shouldBuildEmptyMap() { - final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA); + final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA); assertThat(b.build()).isEmpty(); // even when upload results are added - b.addUploadResult(ObjectKey.Suffix.MANIFEST, 10L); + b.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 10L); assertThat(b.build()).isEmpty(); } @Test void shouldFailWhenAddingExistingSuffixUploadResult() { - final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA); + final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA); - b.addUploadResult(ObjectKey.Suffix.MANIFEST, 10L); - assertThatThrownBy(() -> b.addUploadResult(ObjectKey.Suffix.MANIFEST, 20L)) + b.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 10L); + assertThatThrownBy(() -> b.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 20L)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Upload results for suffix MANIFEST already added"); } @@ -68,15 +68,15 @@ void shouldFailWhenAddingExistingSuffixUploadResult() { @Test void shouldIncludeTotalSize() { final var field = SegmentCustomMetadataField.REMOTE_SIZE; - final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA); + final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA); var fields = b.build(); assertThat(fields) .containsExactly(entry(field.index, 0L)); // i.e. no upload results // but, when existing upload results b - .addUploadResult(ObjectKey.Suffix.LOG, 40L) - .addUploadResult(ObjectKey.Suffix.MANIFEST, 2L); + .addUploadResult(ObjectKeyFactory.Suffix.LOG, 40L) + .addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 2L); fields = b.build(); assertThat(fields) .containsExactly(entry(field.index, 42L)); // i.e. sum @@ -85,7 +85,7 @@ void shouldIncludeTotalSize() { @Test void shouldIncludeObjectPrefix() { final var field = SegmentCustomMetadataField.OBJECT_PREFIX; - final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA); + final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA); final var fields = b.build(); assertThat(fields) .containsExactly(entry(field.index, "p1")); @@ -94,7 +94,7 @@ void shouldIncludeObjectPrefix() { @Test void shouldIncludeObjectKey() { final var field = SegmentCustomMetadataField.OBJECT_KEY; - final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA); + final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA); final var fields = b.build(); assertThat(fields) .containsExactly(entry(field.index, "topic-" + TOPIC_ID + "/0/00000000000000000001-" + SEGMENT_ID)); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java index 9fb048a53..028b87c76 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java @@ -33,6 +33,7 @@ import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.ObjectFetcher; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import org.junit.jupiter.api.BeforeEach; @@ -49,7 +50,7 @@ @ExtendWith(MockitoExtension.class) class FetchChunkEnumerationSourceInputStreamClosingTest { - static final String OBJECT_KEY_PATH = "topic/segment"; + static final ObjectKey OBJECT_KEY = () -> "topic/segment"; static final int CHUNK_SIZE = 10; static final BytesRange RANGE1 = BytesRange.ofFromPositionAndSize(0, CHUNK_SIZE); @@ -79,7 +80,7 @@ void test(final Map config, final ChunkManagerFactory chunkManagerFactory = new ChunkManagerFactory(); chunkManagerFactory.configure(config); final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(fetcher, null); - final var is = new FetchChunkEnumeration(chunkManager, OBJECT_KEY_PATH, SEGMENT_MANIFEST, range) + final var is = new FetchChunkEnumeration(chunkManager, OBJECT_KEY, SEGMENT_MANIFEST, range) .toInputStream(); if (readFully) { is.readAllBytes(); @@ -132,13 +133,13 @@ private static class TestObjectFetcher implements ObjectFetcher { private final List openInputStreams = new ArrayList<>(); @Override - public InputStream fetch(final String key) throws StorageBackendException { + public InputStream fetch(final ObjectKey key) throws StorageBackendException { throw new RuntimeException("Should not be called"); } @Override - public InputStream fetch(final String key, final BytesRange range) { - if (!key.equals(OBJECT_KEY_PATH)) { + public InputStream fetch(final ObjectKey key, final BytesRange range) { + if (!key.equals(OBJECT_KEY)) { throw new IllegalArgumentException("Invalid key: " + key); } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java index c37b9987a..65e331c83 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java @@ -24,7 +24,9 @@ import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1; import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; +import io.aiven.kafka.tieredstorage.storage.TestObjectKey; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -44,7 +46,7 @@ class FetchChunkEnumerationTest { final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); static final byte[] CHUNK_CONTENT = "0123456789".getBytes(); - static final String SEGMENT_KEY_PATH = "topic/segment"; + static final ObjectKey SEGMENT_KEY = new TestObjectKey("topic/segment"); // Test scenarios // - Initialization @@ -58,7 +60,7 @@ void failsWhenLargerStartPosition() { final int to = from + 1; // Then assertThatThrownBy( - () -> new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to))) + () -> new FetchChunkEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to))) .hasMessage("Invalid start position " + from + " in segment path topic/segment"); } @@ -71,7 +73,7 @@ void endPositionIsWithinIndex() { final int to = 80; // Then final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to)); assertThat(fetchChunk.startChunkId).isEqualTo(0); assertThat(fetchChunk.lastChunkId).isEqualTo(8); } @@ -84,7 +86,7 @@ void endPositionIsOutsideIndex() { final int from = 0; final int to = 110; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to)); // Then assertThat(fetchChunk.startChunkId).isEqualTo(0); assertThat(fetchChunk.lastChunkId).isEqualTo(9); @@ -98,8 +100,8 @@ void shouldReturnRangeFromSingleChunk() throws StorageBackendException { final int from = 32; final int to = 34; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, fetchChunk.currentChunkId)) + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to)); + when(chunkManager.getChunk(SEGMENT_KEY, manifest, fetchChunk.currentChunkId)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then assertThat(fetchChunk.startChunkId).isEqualTo(fetchChunk.lastChunkId); @@ -116,12 +118,12 @@ void shouldReturnRangeFromMultipleChunks() throws StorageBackendException { final int from = 15; final int to = 34; final FetchChunkEnumeration fetchChunk = - new FetchChunkEnumeration(chunkManager, SEGMENT_KEY_PATH, manifest, BytesRange.of(from, to)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 1)) + new FetchChunkEnumeration(chunkManager, SEGMENT_KEY, manifest, BytesRange.of(from, to)); + when(chunkManager.getChunk(SEGMENT_KEY, manifest, 1)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 2)) + when(chunkManager.getChunk(SEGMENT_KEY, manifest, 2)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); - when(chunkManager.getChunk(SEGMENT_KEY_PATH, manifest, 3)) + when(chunkManager.getChunk(SEGMENT_KEY, manifest, 3)) .thenReturn(new ByteArrayInputStream(CHUNK_CONTENT)); // Then assertThat(fetchChunk.startChunkId).isNotEqualTo(fetchChunk.lastChunkId); diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/KeyNotFoundException.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/KeyNotFoundException.java index 0ed538e24..e622dddfa 100644 --- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/KeyNotFoundException.java +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/KeyNotFoundException.java @@ -17,15 +17,15 @@ package io.aiven.kafka.tieredstorage.storage; public class KeyNotFoundException extends StorageBackendException { - public KeyNotFoundException(final StorageBackend storage, final String key) { + public KeyNotFoundException(final StorageBackend storage, final ObjectKey key) { super(getMessage(storage, key)); } - public KeyNotFoundException(final StorageBackend storage, final String key, final Exception e) { + public KeyNotFoundException(final StorageBackend storage, final ObjectKey key, final Exception e) { super(getMessage(storage, key), e); } - private static String getMessage(final StorageBackend storage, final String key) { + private static String getMessage(final StorageBackend storage, final ObjectKey key) { return "Key " + key + " does not exists in storage " + storage; } } diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectDeleter.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectDeleter.java index 6976cce93..1b0ad1268 100644 --- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectDeleter.java +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectDeleter.java @@ -22,5 +22,5 @@ public interface ObjectDeleter { * *

If the object doesn't exist, the operation still succeeds as it is idempotent. */ - void delete(String key) throws StorageBackendException; + void delete(ObjectKey key) throws StorageBackendException; } diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java index 309745f41..7c672ceeb 100644 --- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java @@ -23,12 +23,12 @@ public interface ObjectFetcher { * Fetch file. * @param key file key. */ - InputStream fetch(String key) throws StorageBackendException; + InputStream fetch(ObjectKey key) throws StorageBackendException; /** * Fetch file. * @param key file key. * @param range range with inclusive start/end positions */ - InputStream fetch(String key, BytesRange range) throws StorageBackendException; + InputStream fetch(ObjectKey key, BytesRange range) throws StorageBackendException; } diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectKey.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectKey.java new file mode 100644 index 000000000..d5374bd2e --- /dev/null +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectKey.java @@ -0,0 +1,21 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage; + +public interface ObjectKey { + String value(); +} diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java index a31366afe..14200861f 100644 --- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java @@ -24,5 +24,5 @@ public interface ObjectUploader { * @param key path to an object within a storage backend. * @return number of bytes uploaded */ - long upload(InputStream inputStream, String key) throws StorageBackendException; + long upload(InputStream inputStream, ObjectKey key) throws StorageBackendException; } diff --git a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java index 9933e897f..5a57fab46 100644 --- a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java +++ b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; +import org.assertj.core.util.Throwables; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -28,7 +29,7 @@ public abstract class BaseStorageTest { - protected static final String TOPIC_PARTITION_SEGMENT_KEY = "topic/partition/log"; + protected static final ObjectKey TOPIC_PARTITION_SEGMENT_KEY = new TestObjectKey("topic/partition/log"); protected abstract StorageBackend storage(); @@ -80,10 +81,10 @@ void testRetryUploadKeepLatestVersion() throws StorageBackendException { @Test void testFetchFailWhenNonExistingKey() { - assertThatThrownBy(() -> storage().fetch("non-existing")) + assertThatThrownBy(() -> storage().fetch(new TestObjectKey("non-existing"))) .isInstanceOf(KeyNotFoundException.class) .hasMessage("Key non-existing does not exists in storage " + storage()); - assertThatThrownBy(() -> storage().fetch("non-existing", BytesRange.of(0, 1))) + assertThatThrownBy(() -> storage().fetch(new TestObjectKey("non-existing"), BytesRange.of(0, 1))) .isInstanceOf(KeyNotFoundException.class) .hasMessage("Key non-existing does not exists in storage " + storage()); } @@ -146,14 +147,41 @@ void testFetchWithRangeOutsideFileSize() throws StorageBackendException { @Test void testFetchNonExistingKey() { - assertThatThrownBy(() -> storage().fetch("non-existing")) + assertThatThrownBy(() -> storage().fetch(new TestObjectKey("non-existing"))) .isInstanceOf(KeyNotFoundException.class) .hasMessage("Key non-existing does not exists in storage " + storage()); - assertThatThrownBy(() -> storage().fetch("non-existing", BytesRange.of(0, 1))) + assertThatThrownBy(() -> storage().fetch(new TestObjectKey("non-existing"), BytesRange.of(0, 1))) .isInstanceOf(KeyNotFoundException.class) .hasMessage("Key non-existing does not exists in storage " + storage()); } + @Test + void testFetchNonExistingKeyMasking() { + final ObjectKey key = new ObjectKey() { + @Override + public String value() { + return "real-key"; + } + + @Override + public String toString() { + return "masked-key"; + } + }; + + assertThatThrownBy(() -> storage().fetch(key)) + .extracting(Throwables::getStackTrace) + .asString() + .contains("masked-key") + .doesNotContain("real-key"); + + assertThatThrownBy(() -> storage().fetch(key, BytesRange.of(0, 1))) + .extracting(Throwables::getStackTrace) + .asString() + .contains("masked-key") + .doesNotContain("real-key"); + } + @Test protected void testDelete() throws StorageBackendException { storage().upload(new ByteArrayInputStream("test".getBytes()), TOPIC_PARTITION_SEGMENT_KEY); diff --git a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/TestObjectKey.java b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/TestObjectKey.java new file mode 100644 index 000000000..5e04bb455 --- /dev/null +++ b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/TestObjectKey.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage; + +import java.util.Objects; + +public class TestObjectKey implements ObjectKey { + private final String key; + + public TestObjectKey(final String key) { + this.key = Objects.requireNonNull(key, "key cannot be null"); + } + + @Override + public String value() { + return key; + } + + @Override + public String toString() { + return key; + } +} diff --git a/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java b/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java index 120dbb0b2..dac4718a0 100644 --- a/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java +++ b/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java @@ -27,6 +27,7 @@ import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.InvalidRangeException; import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -47,9 +48,9 @@ public void configure(final Map configs) { } @Override - public long upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final ObjectKey key) throws StorageBackendException { try { - final Path path = fsRoot.resolve(key); + final Path path = fsRoot.resolve(key.value()); Files.createDirectories(path.getParent()); Files.copy(inputStream, path, StandardCopyOption.REPLACE_EXISTING); return Files.size(path); @@ -59,21 +60,21 @@ public long upload(final InputStream inputStream, final String key) throws Stora } @Override - public InputStream fetch(final String key) throws StorageBackendException { + public InputStream fetch(final ObjectKey key) throws StorageBackendException { try { - final Path path = fsRoot.resolve(key); + final Path path = fsRoot.resolve(key.value()); return Files.newInputStream(path); } catch (final NoSuchFileException e) { - throw new KeyNotFoundException(this, key, e); + throw new KeyNotFoundException(this, key); } catch (final IOException e) { throw new StorageBackendException("Failed to fetch " + key, e); } } @Override - public InputStream fetch(final String key, final BytesRange range) throws StorageBackendException { + public InputStream fetch(final ObjectKey key, final BytesRange range) throws StorageBackendException { try { - final Path path = fsRoot.resolve(key); + final Path path = fsRoot.resolve(key.value()); final long fileSize = Files.size(path); if (range.from >= fileSize) { throw new InvalidRangeException("Range start position " + range.from @@ -85,16 +86,16 @@ public InputStream fetch(final String key, final BytesRange range) throws Storag final long size = Math.min(range.to, fileSize) - range.from + 1; return new BoundedInputStream(chunkContent, size); } catch (final NoSuchFileException e) { - throw new KeyNotFoundException(this, key, e); + throw new KeyNotFoundException(this, key); } catch (final IOException e) { throw new StorageBackendException("Failed to fetch " + key + ", with range " + range, e); } } @Override - public void delete(final String key) throws StorageBackendException { + public void delete(final ObjectKey key) throws StorageBackendException { try { - final Path path = fsRoot.resolve(key); + final Path path = fsRoot.resolve(key.value()); Files.deleteIfExists(path); Path parent = path.getParent(); while (parent != null && Files.isDirectory(parent) && !parent.equals(fsRoot) diff --git a/storage/filesystem/src/test/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorageTest.java b/storage/filesystem/src/test/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorageTest.java index 653010052..0d41d3958 100644 --- a/storage/filesystem/src/test/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorageTest.java +++ b/storage/filesystem/src/test/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorageTest.java @@ -24,6 +24,7 @@ import io.aiven.kafka.tieredstorage.storage.BaseStorageTest; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; +import io.aiven.kafka.tieredstorage.storage.TestObjectKey; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -71,7 +72,7 @@ void testRootCannotBeNonWritableDirectory() throws IOException { @Test void testDeleteAllParentsButRoot() throws IOException, StorageBackendException { - final Path keyPath = root.resolve(TOPIC_PARTITION_SEGMENT_KEY); + final Path keyPath = root.resolve(TOPIC_PARTITION_SEGMENT_KEY.value()); Files.createDirectories(keyPath.getParent()); Files.writeString(keyPath, "test"); final FileSystemStorage storage = new FileSystemStorage(); @@ -95,7 +96,7 @@ void testDeleteDoesNotRemoveParentDir() throws IOException, StorageBackendExcept Files.writeString(keyPath, "test"); final FileSystemStorage storage = new FileSystemStorage(); storage.configure(Map.of("root", root.toString())); - storage.delete(parent + "/" + key); + storage.delete(new TestObjectKey(parent + "/" + key)); assertThat(keyPath).doesNotExist(); assertThat(parentPath).exists(); diff --git a/storage/gcs/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorageMetricsTest.java b/storage/gcs/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorageMetricsTest.java index 1dd9bca03..87e67ea91 100644 --- a/storage/gcs/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorageMetricsTest.java +++ b/storage/gcs/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorageMetricsTest.java @@ -27,7 +27,9 @@ import java.util.Map; import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; +import io.aiven.kafka.tieredstorage.storage.TestObjectKey; import io.aiven.testcontainers.fakegcsserver.FakeGcsServerContainer; import com.google.cloud.NoCredentials; @@ -94,7 +96,7 @@ void setUp(final TestInfo testInfo) { void metricsShouldBeReported() throws StorageBackendException, IOException, JMException { final byte[] data = new byte[RESUMABLE_UPLOAD_CHUNK_SIZE + 1]; - final String key = "x"; + final ObjectKey key = new TestObjectKey("x"); storage.upload(new ByteArrayInputStream(data), key); try (final InputStream fetch = storage.fetch(key)) { diff --git a/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java b/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java index 12d40037a..198c088ef 100644 --- a/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java +++ b/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java @@ -24,6 +24,7 @@ import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.InvalidRangeException; import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -57,9 +58,9 @@ public void configure(final Map configs) { } @Override - public long upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final ObjectKey key) throws StorageBackendException { try { - final BlobInfo blobInfo = BlobInfo.newBuilder(this.bucketName, key).build(); + final BlobInfo blobInfo = BlobInfo.newBuilder(this.bucketName, key.value()).build(); final Blob blob; if (resumableUploadChunkSize != null) { blob = storage.createFrom(blobInfo, inputStream, resumableUploadChunkSize); @@ -73,16 +74,16 @@ public long upload(final InputStream inputStream, final String key) throws Stora } @Override - public void delete(final String key) throws StorageBackendException { + public void delete(final ObjectKey key) throws StorageBackendException { try { - storage.delete(this.bucketName, key); + storage.delete(this.bucketName, key.value()); } catch (final StorageException e) { throw new StorageBackendException("Failed to delete " + key, e); } } @Override - public InputStream fetch(final String key) throws StorageBackendException { + public InputStream fetch(final ObjectKey key) throws StorageBackendException { try { final Blob blob = getBlob(key); final ReadChannel reader = blob.reader(); @@ -93,7 +94,7 @@ public InputStream fetch(final String key) throws StorageBackendException { } @Override - public InputStream fetch(final String key, final BytesRange range) throws StorageBackendException { + public InputStream fetch(final ObjectKey key, final BytesRange range) throws StorageBackendException { try { final Blob blob = getBlob(key); @@ -118,11 +119,11 @@ public InputStream fetch(final String key, final BytesRange range) throws Storag } } - private Blob getBlob(final String key) throws KeyNotFoundException { + private Blob getBlob(final ObjectKey key) throws KeyNotFoundException { // Unfortunately, it seems Google will do two a separate (HEAD-like) call to get blob metadata. // Since the blobs are immutable in tiered storage, we can consider caching them locally // to avoid the extra round trip. - final Blob blob = storage.get(this.bucketName, key); + final Blob blob = storage.get(this.bucketName, key.value()); if (blob == null) { throw new KeyNotFoundException(this, key); } diff --git a/storage/s3/build.gradle b/storage/s3/build.gradle index 5056ab5b9..0a2739833 100644 --- a/storage/s3/build.gradle +++ b/storage/s3/build.gradle @@ -38,5 +38,8 @@ dependencies { testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" testImplementation "org.testcontainers:localstack:$testcontainersVersion" - integrationTestImplementation "org.wiremock:wiremock:$wireMockVersion" + + integrationTestImplementation("org.wiremock:wiremock:$wireMockVersion") { + exclude group: "org.slf4j" + } } diff --git a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3ErrorMetricsTest.java b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3ErrorMetricsTest.java index decb99b24..fde2df0ee 100644 --- a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3ErrorMetricsTest.java +++ b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3ErrorMetricsTest.java @@ -24,6 +24,8 @@ import java.lang.management.ManagementFactory; import java.util.Map; +import io.aiven.kafka.tieredstorage.storage.TestObjectKey; + import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; import com.github.tomakehurst.wiremock.junit5.WireMockTest; @@ -84,7 +86,7 @@ void testS3ServerExceptions(final int statusCode, .withHeader(CONTENT_TYPE, APPLICATION_XML.getMimeType()) .withBody(String.format(ERROR_RESPONSE_TEMPLATE, statusCode)))); final S3Exception s3Exception = catchThrowableOfType( - () -> storage.upload(InputStream.nullInputStream(), "key"), + () -> storage.upload(InputStream.nullInputStream(), new TestObjectKey("key")), S3Exception.class); assertThat(s3Exception.statusCode()).isEqualTo(statusCode); @@ -115,7 +117,7 @@ void testOtherExceptions(final WireMockRuntimeInfo wmRuntimeInfo) throws Excepti .withHeader(CONTENT_TYPE, APPLICATION_XML.getMimeType()) .withBody("unparsable_xml"))); - assertThatThrownBy(() -> storage.upload(InputStream.nullInputStream(), "key")) + assertThatThrownBy(() -> storage.upload(InputStream.nullInputStream(), new TestObjectKey("key"))) .isInstanceOf(SdkClientException.class) .hasMessage("Could not parse XML response."); @@ -140,7 +142,7 @@ void apiCallAttemptTimeout(final WireMockRuntimeInfo wmRuntimeInfo) throws Excep stubFor(any(anyUrl()).willReturn(aResponse().withFixedDelay(100))); - assertThatThrownBy(() -> storage.fetch("key")) + assertThatThrownBy(() -> storage.fetch(new TestObjectKey("key"))) .isInstanceOf(ApiCallAttemptTimeoutException.class) .hasMessage("HTTP request execution did not complete before the specified timeout configuration: 1 millis"); @@ -169,7 +171,7 @@ void ioErrors(final WireMockRuntimeInfo wmRuntimeInfo) throws Exception { .withStatus(HttpStatusCode.OK) .withFault(Fault.RANDOM_DATA_THEN_CLOSE))); - assertThatThrownBy(() -> storage.fetch("key")) + assertThatThrownBy(() -> storage.fetch(new TestObjectKey("key"))) .isInstanceOf(SdkClientException.class) .hasMessage("Unable to execute HTTP request: null"); diff --git a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java index fe1bbf40a..406b4d850 100644 --- a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java +++ b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java @@ -26,6 +26,8 @@ import java.util.Map; import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; +import io.aiven.kafka.tieredstorage.storage.TestObjectKey; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -99,7 +101,7 @@ void setupStorage() { void metricsShouldBeReported() throws Exception { final byte[] data = new byte[PART_SIZE + 1]; - final String key = "x"; + final ObjectKey key = new TestObjectKey("x"); storage.upload(new ByteArrayInputStream(data), key); try (final InputStream fetch = storage.fetch(key)) { diff --git a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java index 1e2d430ad..d2f0d02a5 100644 --- a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java +++ b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java @@ -21,6 +21,7 @@ import io.aiven.kafka.tieredstorage.storage.BaseStorageTest; import io.aiven.kafka.tieredstorage.storage.StorageBackend; +import io.aiven.kafka.tieredstorage.storage.TestObjectKey; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -90,7 +91,7 @@ protected StorageBackend storage() { @Test void partSizePassedToStream() throws IOException { - try (final var os = ((S3Storage) storage()).s3OutputStream("test")) { + try (final var os = ((S3Storage) storage()).s3OutputStream(new TestObjectKey("test"))) { assertThat(os.partSize).isEqualTo(PART_SIZE); } } diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java index fa71f82dc..af5806698 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.List; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.sync.RequestBody; @@ -52,7 +54,7 @@ public class S3MultiPartOutputStream extends OutputStream { private final S3Client client; private final ByteBuffer partBuffer; private final String bucketName; - private final String key; + private final ObjectKey key; final int partSize; private final String uploadId; @@ -62,7 +64,7 @@ public class S3MultiPartOutputStream extends OutputStream { private long processedBytes; public S3MultiPartOutputStream(final String bucketName, - final String key, + final ObjectKey key, final int partSize, final S3Client client) { this.bucketName = bucketName; @@ -71,7 +73,7 @@ public S3MultiPartOutputStream(final String bucketName, this.partSize = partSize; this.partBuffer = ByteBuffer.allocate(partSize); final CreateMultipartUploadRequest initialRequest = CreateMultipartUploadRequest.builder().bucket(bucketName) - .key(key).build(); + .key(key.value()).build(); final CreateMultipartUploadResponse initiateResult = client.createMultipartUpload(initialRequest); log.debug("Create new multipart upload request: {}", initiateResult.uploadId()); this.uploadId = initiateResult.uploadId(); @@ -147,7 +149,7 @@ private void completeUpload() { .build(); final var request = CompleteMultipartUploadRequest.builder() .bucket(bucketName) - .key(key) + .key(key.value()) .uploadId(uploadId) .multipartUpload(completedMultipartUpload) .build(); @@ -158,7 +160,7 @@ private void completeUpload() { private void abortUpload() { final var request = AbortMultipartUploadRequest.builder() .bucket(bucketName) - .key(key) + .key(key.value()) .uploadId(uploadId) .build(); client.abortMultipartUpload(request); @@ -177,7 +179,7 @@ private void uploadPart(final InputStream in, final int actualPartSize) { final UploadPartRequest uploadPartRequest = UploadPartRequest.builder() .bucket(bucketName) - .key(key) + .key(key.value()) .uploadId(uploadId) .partNumber(partNumber) .build(); diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java index f31ef8a02..5f5d1b2b5 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java @@ -23,6 +23,7 @@ import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.InvalidRangeException; import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -46,7 +47,7 @@ public void configure(final Map configs) { } @Override - public long upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final ObjectKey key) throws StorageBackendException { try (final var out = s3OutputStream(key)) { inputStream.transferTo(out); return out.processedBytes(); @@ -55,14 +56,14 @@ public long upload(final InputStream inputStream, final String key) throws Stora } } - S3MultiPartOutputStream s3OutputStream(final String key) { + S3MultiPartOutputStream s3OutputStream(final ObjectKey key) { return new S3MultiPartOutputStream(bucketName, key, partSize, s3Client); } @Override - public void delete(final String key) throws StorageBackendException { + public void delete(final ObjectKey key) throws StorageBackendException { try { - final var deleteRequest = DeleteObjectRequest.builder().bucket(bucketName).key(key).build(); + final var deleteRequest = DeleteObjectRequest.builder().bucket(bucketName).key(key.value()).build(); s3Client.deleteObject(deleteRequest); } catch (final AwsServiceException e) { throw new StorageBackendException("Failed to delete " + key, e); @@ -70,8 +71,8 @@ public void delete(final String key) throws StorageBackendException { } @Override - public InputStream fetch(final String key) throws StorageBackendException { - final GetObjectRequest getRequest = GetObjectRequest.builder().bucket(bucketName).key(key).build(); + public InputStream fetch(final ObjectKey key) throws StorageBackendException { + final GetObjectRequest getRequest = GetObjectRequest.builder().bucket(bucketName).key(key.value()).build(); try { return s3Client.getObject(getRequest); } catch (final AwsServiceException e) { @@ -84,11 +85,11 @@ public InputStream fetch(final String key) throws StorageBackendException { } @Override - public InputStream fetch(final String key, final BytesRange range) throws StorageBackendException { + public InputStream fetch(final ObjectKey key, final BytesRange range) throws StorageBackendException { try { final GetObjectRequest getRequest = GetObjectRequest.builder() .bucket(bucketName) - .key(key) + .key(key.value()) .range(range.toString()) .build(); return s3Client.getObject(getRequest); diff --git a/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java b/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java index 51c44ebc4..d95824584 100644 --- a/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java +++ b/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java @@ -21,6 +21,9 @@ import java.util.List; import java.util.Random; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; +import io.aiven.kafka.tieredstorage.storage.TestObjectKey; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -51,7 +54,7 @@ @ExtendWith(MockitoExtension.class) class S3MultiPartOutputStreamTest { private static final String BUCKET_NAME = "some_bucket"; - private static final String FILE_KEY = "some_key"; + private static final ObjectKey FILE_KEY = new TestObjectKey("some_key"); private static final String UPLOAD_ID = "some_upload_id"; @Mock @@ -327,7 +330,7 @@ private static void assertUploadPartRequest(final UploadPartRequest uploadPartRe assertThat(uploadPartRequest.uploadId()).isEqualTo(UPLOAD_ID); assertThat(uploadPartRequest.partNumber()).isEqualTo(expectedPartNumber); assertThat(uploadPartRequest.bucket()).isEqualTo(BUCKET_NAME); - assertThat(uploadPartRequest.key()).isEqualTo(FILE_KEY); + assertThat(uploadPartRequest.key()).isEqualTo(FILE_KEY.value()); assertThat(requestBody.optionalContentLength()).hasValue(expectedPartSize); assertThat(requestBody.contentStreamProvider().newStream()).hasBinaryContent(expectedBytes); } @@ -335,14 +338,14 @@ private static void assertUploadPartRequest(final UploadPartRequest uploadPartRe private static void assertCompleteMultipartUploadRequest(final CompleteMultipartUploadRequest request, final List expectedETags) { assertThat(request.bucket()).isEqualTo(BUCKET_NAME); - assertThat(request.key()).isEqualTo(FILE_KEY); + assertThat(request.key()).isEqualTo(FILE_KEY.value()); assertThat(request.uploadId()).isEqualTo(UPLOAD_ID); assertThat(request.multipartUpload().parts()).containsExactlyElementsOf(expectedETags); } private static void assertAbortMultipartUploadRequest(final AbortMultipartUploadRequest request) { assertThat(request.bucket()).isEqualTo(BUCKET_NAME); - assertThat(request.key()).isEqualTo(FILE_KEY); + assertThat(request.key()).isEqualTo(FILE_KEY.value()); assertThat(request.uploadId()).isEqualTo(UPLOAD_ID); } }