From ba077a3eeb8e0fbb7d00b8e0ecd26f6dd011faf3 Mon Sep 17 00:00:00 2001 From: Ivan Yurchenko Date: Wed, 27 Sep 2023 16:08:30 +0300 Subject: [PATCH] Rename ObjectKey -> ObjectKeyFactory --- .../RemoteStorageManagerTest.java | 38 +-- .../{ObjectKey.java => ObjectKeyFactory.java} | 6 +- .../tieredstorage/RemoteStorageManager.java | 39 +-- .../SegmentCustomMetadataBuilder.java | 14 +- .../metadata/SegmentCustomMetadataField.java | 6 +- .../kafka/tieredstorage/metrics/Metrics.java | 8 +- .../metrics/MetricsRegistry.java | 14 +- .../tieredstorage/ObjectKeyFactoryTest.java | 245 ++++++++++++++++++ .../kafka/tieredstorage/ObjectKeyTest.java | 217 ---------------- .../RemoteStorageManagerMetricsTest.java | 2 +- .../SegmentCustomMetadataBuilderTest.java | 24 +- 11 files changed, 323 insertions(+), 290 deletions(-) rename core/src/main/java/io/aiven/kafka/tieredstorage/{ObjectKey.java => ObjectKeyFactory.java} (97%) create mode 100644 core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java delete mode 100644 core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyTest.java diff --git a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index b7fd1f4ef..49b8c07c5 100644 --- a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -480,14 +480,15 @@ void testFetchingSegmentFileNonExistent() throws IOException { ); rsm.configure(config); - final ObjectKey objectKey = new ObjectKey(""); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(""); // Ensure the manifest exists. - writeManifest(objectKey); + writeManifest(objectKeyFactory); // Make sure the exception is connected to the log file. - final String expectedMessage = - "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.LOG) + " does not exists in storage"; + final String expectedMessage = "Key " + + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.LOG) + + " does not exists in storage"; assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0)) .isInstanceOf(RemoteResourceNotFoundException.class) @@ -509,9 +510,10 @@ void testFetchingSegmentManifestNotFound() { rsm.configure(config); // Make sure the exception is connected to the manifest file. - final ObjectKey objectKey = new ObjectKey(""); - final String expectedMessage = - "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage"; + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(""); + final String expectedMessage = "Key " + + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + + " does not exists in storage"; assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0)) .isInstanceOf(RemoteResourceNotFoundException.class) @@ -533,15 +535,15 @@ void testFetchingIndexNonExistent(final IndexType indexType) throws IOException ); rsm.configure(config); - final ObjectKey objectKey = new ObjectKey(""); + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(""); // Ensure the manifest exists. - writeManifest(objectKey); + writeManifest(objectKeyFactory); // Make sure the exception is connected to the index file. - final String expectedMessage = - "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.fromIndexType(indexType)) - + " does not exists in storage"; + final String expectedMessage = "Key " + + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.fromIndexType(indexType)) + + " does not exists in storage"; assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType)) .isInstanceOf(RemoteResourceNotFoundException.class) @@ -560,9 +562,10 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage rsm.configure(config); // Make sure the exception is connected to the manifest file. - final ObjectKey objectKey = new ObjectKey(""); - final String expectedMessage = - "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage"; + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(""); + final String expectedMessage = "Key " + + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + + " does not exists in storage"; assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType)) .isInstanceOf(RemoteResourceNotFoundException.class) @@ -570,14 +573,15 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage .hasStackTraceContaining(expectedMessage); } - private void writeManifest(final ObjectKey objectKey) throws IOException { + private void writeManifest(final ObjectKeyFactory objectKeyFactory) throws IOException { // Ensure the manifest exists. final String manifest = "{\"version\":\"1\"," + "\"chunkIndex\":{\"type\":\"fixed\",\"originalChunkSize\":100," + "\"originalFileSize\":1000,\"transformedChunkSize\":110,\"finalTransformedChunkSize\":110}," + "\"compression\":false}"; - final Path manifestPath = targetDir.resolve(objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST)); + final Path manifestPath = targetDir.resolve( + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)); Files.createDirectories(manifestPath.getParent()); Files.writeString(manifestPath, manifest); } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKey.java b/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java similarity index 97% rename from core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKey.java rename to core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java index a695217ff..0ba09e107 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKey.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java @@ -31,7 +31,7 @@ /** * Maps Kafka segment files to object paths/keys in the storage backend. */ -public final class ObjectKey { +public final class ObjectKeyFactory { /** * Supported files and extensions, including log, index types, and segment manifest. @@ -68,7 +68,7 @@ static Suffix fromIndexType(final RemoteStorageManager.IndexType indexType) { private final String prefix; - public ObjectKey(final String prefix) { + public ObjectKeyFactory(final String prefix) { this.prefix = prefix == null ? "" : prefix; } @@ -82,7 +82,7 @@ public ObjectKey(final String prefix) { *

For example: * {@code someprefix/topic-MWJ6FHTfRYy67jzwZdeqSQ/7/00000000000000001234-tqimKeZwStOEOwRzT3L5oQ.log} * - * @see ObjectKey#mainPath(RemoteLogSegmentMetadata) + * @see ObjectKeyFactory#mainPath(RemoteLogSegmentMetadata) */ public String key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final Suffix suffix) { Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null"); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 85bea09f9..cfddd1071 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -113,7 +113,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote. private AesEncryptionProvider aesEncryptionProvider; private ObjectMapper mapper; private ChunkManager chunkManager; - private ObjectKey objectKey; + private ObjectKeyFactory objectKeyFactory; private SegmentCustomMetadataSerde customMetadataSerde; private Set customMetadataFields; @@ -138,7 +138,7 @@ public void configure(final Map configs) { .recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG))); metrics = new Metrics(time, metricConfig); setStorage(config.storage()); - objectKey = new ObjectKey(config.keyPrefix()); + objectKeyFactory = new ObjectKeyFactory(config.keyPrefix()); encryptionEnabled = config.encryptionEnabled(); if (encryptionEnabled) { final Map keyRing = new HashMap<>(); @@ -198,7 +198,7 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat remoteLogSegmentMetadata.segmentSizeInBytes()); final var customMetadataBuilder = - new SegmentCustomMetadataBuilder(customMetadataFields, objectKey, remoteLogSegmentMetadata); + new SegmentCustomMetadataBuilder(customMetadataFields, objectKeyFactory, remoteLogSegmentMetadata); final long startedMs = time.milliseconds(); @@ -291,15 +291,15 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet final TransformFinisher transformFinisher, final SegmentCustomMetadataBuilder customMetadataBuilder) throws IOException, StorageBackendException { - final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG); + final String fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG); try (final var sis = transformFinisher.toInputStream()) { final var bytes = uploader.upload(sis, fileKey); metrics.recordObjectUpload( remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), - ObjectKey.Suffix.LOG, + ObjectKeyFactory.Suffix.LOG, bytes ); - customMetadataBuilder.addUploadResult(ObjectKey.Suffix.LOG, bytes); + customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes); log.debug("Uploaded segment log for {}, size: {}", remoteLogSegmentMetadata, bytes); } @@ -321,8 +321,8 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta final TransformFinisher transformFinisher = new TransformFinisher(transformEnum); - final var suffix = ObjectKey.Suffix.fromIndexType(indexType); - final String key = objectKey.key(remoteLogSegmentMetadata, suffix); + final var suffix = ObjectKeyFactory.Suffix.fromIndexType(indexType); + final String key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix); try (final var in = transformFinisher.toInputStream()) { final var bytes = uploader.upload(in, key); metrics.recordObjectUpload( @@ -341,16 +341,16 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad final SegmentCustomMetadataBuilder customMetadataBuilder) throws StorageBackendException, IOException { final String manifest = mapper.writeValueAsString(segmentManifest); - final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST); + final String manifestFileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST); try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) { final var bytes = uploader.upload(manifestContent, manifestFileKey); metrics.recordObjectUpload( remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), - ObjectKey.Suffix.MANIFEST, + ObjectKeyFactory.Suffix.MANIFEST, bytes ); - customMetadataBuilder.addUploadResult(ObjectKey.Suffix.MANIFEST, bytes); + customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, bytes); log.debug("Uploaded segment manifest for {}, size: {}", remoteLogSegmentMetadata, bytes); } @@ -384,7 +384,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata); - final var suffix = ObjectKey.Suffix.LOG; + final var suffix = ObjectKeyFactory.Suffix.LOG; final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix); return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range) @@ -404,7 +404,7 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata); - final var key = objectKey(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType)); + final var key = objectKey(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.fromIndexType(indexType)); final var in = fetcher.fetch(key); DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in); @@ -425,21 +425,22 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet } } - private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final ObjectKey.Suffix suffix) { + private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, + final ObjectKeyFactory.Suffix suffix) { final String segmentKey; if (remoteLogSegmentMetadata.customMetadata().isPresent()) { final var customMetadataBytes = remoteLogSegmentMetadata.customMetadata().get(); final var fields = customMetadataSerde.deserialize(customMetadataBytes.value()); - segmentKey = objectKey.key(fields, remoteLogSegmentMetadata, suffix); + segmentKey = objectKeyFactory.key(fields, remoteLogSegmentMetadata, suffix); } else { - segmentKey = objectKey.key(remoteLogSegmentMetadata, suffix); + segmentKey = objectKeyFactory.key(remoteLogSegmentMetadata, suffix); } return segmentKey; } private SegmentManifest fetchSegmentManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws StorageBackendException, IOException { - final String manifestKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST); + final String manifestKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST); return segmentManifestProvider.get(manifestKey); } @@ -455,8 +456,8 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment final long startedMs = time.milliseconds(); try { - for (final ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) { - final String key = objectKey.key(remoteLogSegmentMetadata, suffix); + for (final ObjectKeyFactory.Suffix suffix : ObjectKeyFactory.Suffix.values()) { + final String key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix); deleter.delete(key); } } catch (final Exception e) { diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java index b6176f7b7..88841e53e 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java @@ -23,25 +23,25 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; -import io.aiven.kafka.tieredstorage.ObjectKey; +import io.aiven.kafka.tieredstorage.ObjectKeyFactory; public class SegmentCustomMetadataBuilder { - final ObjectKey objectKey; + final ObjectKeyFactory objectKeyFactory; final RemoteLogSegmentMetadata segmentMetadata; - final EnumMap uploadResults; + final EnumMap uploadResults; final Set fields; public SegmentCustomMetadataBuilder(final Set fields, - final ObjectKey objectKey, + final ObjectKeyFactory objectKeyFactory, final RemoteLogSegmentMetadata segmentMetadata) { this.fields = fields; - this.objectKey = objectKey; + this.objectKeyFactory = objectKeyFactory; this.segmentMetadata = segmentMetadata; - this.uploadResults = new EnumMap<>(ObjectKey.Suffix.class); + this.uploadResults = new EnumMap<>(ObjectKeyFactory.Suffix.class); } - public SegmentCustomMetadataBuilder addUploadResult(final ObjectKey.Suffix suffix, + public SegmentCustomMetadataBuilder addUploadResult(final ObjectKeyFactory.Suffix suffix, final long bytes) { if (uploadResults.containsKey(suffix)) { throw new IllegalArgumentException("Upload results for suffix " + suffix + " already added"); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java index 284cd8bd6..6e63bf0d3 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java @@ -24,13 +24,13 @@ import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Type; -import io.aiven.kafka.tieredstorage.ObjectKey; +import io.aiven.kafka.tieredstorage.ObjectKeyFactory; // index define values on custom metadata fields and cannot be changed without breaking compatibility. public enum SegmentCustomMetadataField { REMOTE_SIZE(0, new Field("remote_size", Type.VARLONG), SegmentCustomMetadataBuilder::totalSize), - OBJECT_PREFIX(1, new Field("object_prefix", Type.COMPACT_STRING), b -> b.objectKey.prefix()), - OBJECT_KEY(2, new Field("object_key", Type.COMPACT_STRING), b -> ObjectKey.mainPath(b.segmentMetadata)); + OBJECT_PREFIX(1, new Field("object_prefix", Type.COMPACT_STRING), b -> b.objectKeyFactory.prefix()), + OBJECT_KEY(2, new Field("object_key", Type.COMPACT_STRING), b -> ObjectKeyFactory.mainPath(b.segmentMetadata)); static final TaggedFieldsSection FIELDS_SECTION = TaggedFieldsSection.of( REMOTE_SIZE.index, REMOTE_SIZE.field, diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java index 1a7d22fd9..db680578e 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java @@ -30,7 +30,7 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.utils.Time; -import io.aiven.kafka.tieredstorage.ObjectKey; +import io.aiven.kafka.tieredstorage.ObjectKeyFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -300,14 +300,14 @@ private void recordSegmentFetchRequests(final TopicPartition topicPartition) { .record(); } - public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKey.Suffix suffix, + public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKeyFactory.Suffix suffix, final long bytes) { recordObjectUploadRequests(topicPartition, suffix); recordObjectUploadBytes(topicPartition, suffix, bytes); } private void recordObjectUploadBytes(final TopicPartition topicPartition, - final ObjectKey.Suffix suffix, + final ObjectKeyFactory.Suffix suffix, final long bytes) { new SensorProvider(metrics, sensorName(OBJECT_UPLOAD_BYTES)) .with(metricsRegistry.objectUploadBytesRate, new Rate()) @@ -348,7 +348,7 @@ private void recordObjectUploadBytes(final TopicPartition topicPartition, .record(bytes); } - private void recordObjectUploadRequests(final TopicPartition topicPartition, final ObjectKey.Suffix suffix) { + private void recordObjectUploadRequests(final TopicPartition topicPartition, final ObjectKeyFactory.Suffix suffix) { new SensorProvider(metrics, sensorName(OBJECT_UPLOAD)) .with(metricsRegistry.objectUploadRequestsRate, new Rate()) .with(metricsRegistry.objectUploadRequestsTotal, new CumulativeCount()) diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java index e0d053726..e82efcde6 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.TopicPartition; -import io.aiven.kafka.tieredstorage.ObjectKey; +import io.aiven.kafka.tieredstorage.ObjectKeyFactory; public class MetricsRegistry { @@ -241,7 +241,7 @@ public static String sensorName(final String name) { return name; } - public static String sensorNameByObjectType(final ObjectKey.Suffix suffix, final String name) { + public static String sensorNameByObjectType(final ObjectKeyFactory.Suffix suffix, final String name) { return TAG_NAME_OBJECT_TYPE + "." + suffix.value + "." + name; } @@ -250,7 +250,7 @@ public static String sensorNameByTopic(final TopicPartition topicPartition, fina } public static String sensorNameByTopicAndObjectType(final TopicPartition topicPartition, - final ObjectKey.Suffix suffix, + final ObjectKeyFactory.Suffix suffix, final String name) { return TAG_NAME_TOPIC + "." + topicPartition.topic() + "." + TAG_NAME_OBJECT_TYPE + "." + suffix.value @@ -264,7 +264,7 @@ public static String sensorNameByTopicPartition(final TopicPartition topicPartit } public static String sensorNameByTopicPartitionAndObjectType(final TopicPartition topicPartition, - final ObjectKey.Suffix suffix, + final ObjectKeyFactory.Suffix suffix, final String name) { return TAG_NAME_TOPIC + "." + topicPartition.topic() + "." + TAG_NAME_PARTITION + "." + topicPartition.partition() @@ -277,7 +277,7 @@ static Map topicTags(final TopicPartition topicPartition) { } static Map topicAndObjectTypeTags(final TopicPartition topicPartition, - final ObjectKey.Suffix suffix) { + final ObjectKeyFactory.Suffix suffix) { return Map.of( TAG_NAME_TOPIC, topicPartition.topic(), TAG_NAME_OBJECT_TYPE, suffix.value @@ -292,7 +292,7 @@ static Map topicPartitionTags(final TopicPartition topicPartitio } static Map topicPartitionAndObjectTypeTags(final TopicPartition topicPartition, - final ObjectKey.Suffix suffix) { + final ObjectKeyFactory.Suffix suffix) { return Map.of( TAG_NAME_TOPIC, topicPartition.topic(), TAG_NAME_PARTITION, String.valueOf(topicPartition.partition()), @@ -300,7 +300,7 @@ static Map topicPartitionAndObjectTypeTags(final TopicPartition ); } - static Map objectTypeTags(final ObjectKey.Suffix suffix) { + static Map objectTypeTags(final ObjectKeyFactory.Suffix suffix) { return Map.of(TAG_NAME_OBJECT_TYPE, suffix.value); } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java new file mode 100644 index 000000000..50a4ec5e7 --- /dev/null +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java @@ -0,0 +1,245 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage; + +import java.util.Map; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; + +import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class ObjectKeyFactoryTest { + static final Uuid TOPIC_ID = Uuid.METADATA_TOPIC_ID; // string representation: AAAAAAAAAAAAAAAAAAAAAQ + static final Uuid SEGMENT_ID = Uuid.ZERO_UUID; // string representation: AAAAAAAAAAAAAAAAAAAAAA + static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(TOPIC_ID, new TopicPartition("topic", 7)); + static final RemoteLogSegmentId REMOTE_LOG_SEGMENT_ID = new RemoteLogSegmentId(TOPIC_ID_PARTITION, SEGMENT_ID); + static final RemoteLogSegmentMetadata REMOTE_LOG_SEGMENT_METADATA = new RemoteLogSegmentMetadata( + REMOTE_LOG_SEGMENT_ID, 1234L, 2000L, + 0, 0, 0, 0, Map.of(0, 0L)); + + @Test + void test() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG)) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX)) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX)) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT)) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX)) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT)) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST)) + .isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); + } + + @Test + void withCustomFieldsEmpty() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); + final Map fields = Map.of(); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX) + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX) + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT) + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX) + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + ).isEqualTo( + "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); + } + + @Test + void withCustomFieldsOnlyPrefix() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); + final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX) + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX) + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT) + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX) + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + ).isEqualTo( + "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" + + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); + } + + @Test + void withCustomFieldsOnlyKey() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); + final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + ).isEqualTo("prefix/topic/7/file.log"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX) + ).isEqualTo("prefix/topic/7/file.index"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX) + ).isEqualTo("prefix/topic/7/file.timeindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT) + ).isEqualTo("prefix/topic/7/file.snapshot"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX) + ).isEqualTo("prefix/topic/7/file.txnindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + ).isEqualTo("prefix/topic/7/file.leader-epoch-checkpoint"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + ).isEqualTo("prefix/topic/7/file.rsm-manifest"); + } + + @Test + void withCustomFieldsAll() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/"); + final Map fields = Map.of( + SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/", + SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG) + ).isEqualTo("other/topic/7/file.log"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX) + ).isEqualTo("other/topic/7/file.index"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX) + ).isEqualTo("other/topic/7/file.timeindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT) + ).isEqualTo("other/topic/7/file.snapshot"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX) + ).isEqualTo("other/topic/7/file.txnindex"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + ).isEqualTo("other/topic/7/file.leader-epoch-checkpoint"); + assertThat( + objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST) + ).isEqualTo("other/topic/7/file.rsm-manifest"); + } + + @Test + void nullPrefix() { + final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(null); + assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG)) + .isEqualTo( + "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); + } + + @Test + void suffixForIndexTypes() { + assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.OFFSET)) + .isEqualTo(ObjectKeyFactory.Suffix.OFFSET_INDEX) + .extracting("value") + .isEqualTo("index"); + assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.TIMESTAMP)) + .isEqualTo(ObjectKeyFactory.Suffix.TIME_INDEX) + .extracting("value") + .isEqualTo("timeindex"); + assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT)) + .isEqualTo(ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT) + .extracting("value") + .isEqualTo("snapshot"); + assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.TRANSACTION)) + .isEqualTo(ObjectKeyFactory.Suffix.TXN_INDEX) + .extracting("value") + .isEqualTo("txnindex"); + assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.LEADER_EPOCH)) + .isEqualTo(ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT) + .extracting("value") + .isEqualTo("leader-epoch-checkpoint"); + } +} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyTest.java deleted file mode 100644 index 4ed07a959..000000000 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyTest.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright 2023 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.tieredstorage; - -import java.util.Map; - -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; - -import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField; - -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -class ObjectKeyTest { - static final Uuid TOPIC_ID = Uuid.METADATA_TOPIC_ID; // string representation: AAAAAAAAAAAAAAAAAAAAAQ - static final Uuid SEGMENT_ID = Uuid.ZERO_UUID; // string representation: AAAAAAAAAAAAAAAAAAAAAA - static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(TOPIC_ID, new TopicPartition("topic", 7)); - static final RemoteLogSegmentId REMOTE_LOG_SEGMENT_ID = new RemoteLogSegmentId(TOPIC_ID_PARTITION, SEGMENT_ID); - static final RemoteLogSegmentMetadata REMOTE_LOG_SEGMENT_METADATA = new RemoteLogSegmentMetadata( - REMOTE_LOG_SEGMENT_ID, 1234L, 2000L, - 0, 0, 0, 0, Map.of(0, 0L)); - - @Test - void test() { - final ObjectKey objectKey = new ObjectKey("prefix/"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); - } - - @Test - void withCustomFieldsEmpty() { - final ObjectKey objectKey = new ObjectKey("prefix/"); - final Map fields = Map.of(); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST)) - .isEqualTo( - "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); - } - - @Test - void withCustomFieldsOnlyPrefix() { - final ObjectKey objectKey = new ObjectKey("prefix/"); - final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST)) - .isEqualTo( - "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/" - + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest"); - } - - @Test - void withCustomFieldsOnlyKey() { - final ObjectKey objectKey = new ObjectKey("prefix/"); - final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo("prefix/topic/7/file.log"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX)) - .isEqualTo("prefix/topic/7/file.index"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX)) - .isEqualTo("prefix/topic/7/file.timeindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT)) - .isEqualTo("prefix/topic/7/file.snapshot"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX)) - .isEqualTo("prefix/topic/7/file.txnindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT)) - .isEqualTo("prefix/topic/7/file.leader-epoch-checkpoint"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST)) - .isEqualTo("prefix/topic/7/file.rsm-manifest"); - } - - @Test - void withCustomFieldsAll() { - final ObjectKey objectKey = new ObjectKey("prefix/"); - final Map fields = Map.of( - SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/", - SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo("other/topic/7/file.log"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX)) - .isEqualTo("other/topic/7/file.index"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX)) - .isEqualTo("other/topic/7/file.timeindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT)) - .isEqualTo("other/topic/7/file.snapshot"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX)) - .isEqualTo("other/topic/7/file.txnindex"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT)) - .isEqualTo("other/topic/7/file.leader-epoch-checkpoint"); - assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST)) - .isEqualTo("other/topic/7/file.rsm-manifest"); - } - - @Test - void nullPrefix() { - final ObjectKey objectKey = new ObjectKey(null); - assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG)) - .isEqualTo( - "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log"); - } - - @Test - void suffixForIndexTypes() { - assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.OFFSET)) - .isEqualTo(ObjectKey.Suffix.OFFSET_INDEX) - .extracting("value") - .isEqualTo("index"); - assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.TIMESTAMP)) - .isEqualTo(ObjectKey.Suffix.TIME_INDEX) - .extracting("value") - .isEqualTo("timeindex"); - assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT)) - .isEqualTo(ObjectKey.Suffix.PRODUCER_SNAPSHOT) - .extracting("value") - .isEqualTo("snapshot"); - assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.TRANSACTION)) - .isEqualTo(ObjectKey.Suffix.TXN_INDEX) - .extracting("value") - .isEqualTo("txnindex"); - assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.LEADER_EPOCH)) - .isEqualTo(ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT) - .extracting("value") - .isEqualTo("leader-epoch-checkpoint"); - } -} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java index ba894af96..825d30374 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java @@ -153,7 +153,7 @@ void metricsShouldBeReported(final String tags) throws RemoteStorageException, J assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-bytes-rate")) .isEqualTo(1575.0 / METRIC_TIME_WINDOW_SEC); - for (final var suffix : ObjectKey.Suffix.values()) { + for (final var suffix : ObjectKeyFactory.Suffix.values()) { final ObjectName storageMetricsName = ObjectName.getInstance(objectName + ",object-type=" + suffix.value); switch (suffix) { case TXN_INDEX: diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java index 74bc9ec99..2a8ce1af9 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java @@ -25,7 +25,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; -import io.aiven.kafka.tieredstorage.ObjectKey; +import io.aiven.kafka.tieredstorage.ObjectKeyFactory; import org.junit.jupiter.api.Test; @@ -43,24 +43,24 @@ class SegmentCustomMetadataBuilderTest { SEGMENT_ID), 1, 100, -1, -1, 1L, 100, Collections.singletonMap(1, 100L)); - static final ObjectKey OBJECT_KEY = new ObjectKey("p1"); + static final ObjectKeyFactory OBJECT_KEY_FACTORY = new ObjectKeyFactory("p1"); @Test void shouldBuildEmptyMap() { - final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA); + final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA); assertThat(b.build()).isEmpty(); // even when upload results are added - b.addUploadResult(ObjectKey.Suffix.MANIFEST, 10L); + b.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 10L); assertThat(b.build()).isEmpty(); } @Test void shouldFailWhenAddingExistingSuffixUploadResult() { - final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA); + final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA); - b.addUploadResult(ObjectKey.Suffix.MANIFEST, 10L); - assertThatThrownBy(() -> b.addUploadResult(ObjectKey.Suffix.MANIFEST, 20L)) + b.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 10L); + assertThatThrownBy(() -> b.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 20L)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Upload results for suffix MANIFEST already added"); } @@ -68,15 +68,15 @@ void shouldFailWhenAddingExistingSuffixUploadResult() { @Test void shouldIncludeTotalSize() { final var field = SegmentCustomMetadataField.REMOTE_SIZE; - final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA); + final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA); var fields = b.build(); assertThat(fields) .containsExactly(entry(field.index, 0L)); // i.e. no upload results // but, when existing upload results b - .addUploadResult(ObjectKey.Suffix.LOG, 40L) - .addUploadResult(ObjectKey.Suffix.MANIFEST, 2L); + .addUploadResult(ObjectKeyFactory.Suffix.LOG, 40L) + .addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 2L); fields = b.build(); assertThat(fields) .containsExactly(entry(field.index, 42L)); // i.e. sum @@ -85,7 +85,7 @@ void shouldIncludeTotalSize() { @Test void shouldIncludeObjectPrefix() { final var field = SegmentCustomMetadataField.OBJECT_PREFIX; - final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA); + final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA); final var fields = b.build(); assertThat(fields) .containsExactly(entry(field.index, "p1")); @@ -94,7 +94,7 @@ void shouldIncludeObjectPrefix() { @Test void shouldIncludeObjectKey() { final var field = SegmentCustomMetadataField.OBJECT_KEY; - final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA); + final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA); final var fields = b.build(); assertThat(fields) .containsExactly(entry(field.index, "topic-" + TOPIC_ID + "/0/00000000000000000001-" + SEGMENT_ID));