diff --git a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java
index b7fd1f4ef..49b8c07c5 100644
--- a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java
+++ b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java
@@ -480,14 +480,15 @@ void testFetchingSegmentFileNonExistent() throws IOException {
);
rsm.configure(config);
- final ObjectKey objectKey = new ObjectKey("");
+ final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
// Ensure the manifest exists.
- writeManifest(objectKey);
+ writeManifest(objectKeyFactory);
// Make sure the exception is connected to the log file.
- final String expectedMessage =
- "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.LOG) + " does not exists in storage";
+ final String expectedMessage = "Key "
+ + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.LOG)
+ + " does not exists in storage";
assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0))
.isInstanceOf(RemoteResourceNotFoundException.class)
@@ -509,9 +510,10 @@ void testFetchingSegmentManifestNotFound() {
rsm.configure(config);
// Make sure the exception is connected to the manifest file.
- final ObjectKey objectKey = new ObjectKey("");
- final String expectedMessage =
- "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage";
+ final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
+ final String expectedMessage = "Key "
+ + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ + " does not exists in storage";
assertThatThrownBy(() -> rsm.fetchLogSegment(REMOTE_LOG_METADATA, 0))
.isInstanceOf(RemoteResourceNotFoundException.class)
@@ -533,15 +535,15 @@ void testFetchingIndexNonExistent(final IndexType indexType) throws IOException
);
rsm.configure(config);
- final ObjectKey objectKey = new ObjectKey("");
+ final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
// Ensure the manifest exists.
- writeManifest(objectKey);
+ writeManifest(objectKeyFactory);
// Make sure the exception is connected to the index file.
- final String expectedMessage =
- "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.fromIndexType(indexType))
- + " does not exists in storage";
+ final String expectedMessage = "Key "
+ + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.fromIndexType(indexType))
+ + " does not exists in storage";
assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType))
.isInstanceOf(RemoteResourceNotFoundException.class)
@@ -560,9 +562,10 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage
rsm.configure(config);
// Make sure the exception is connected to the manifest file.
- final ObjectKey objectKey = new ObjectKey("");
- final String expectedMessage =
- "Key " + objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST) + " does not exists in storage";
+ final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("");
+ final String expectedMessage = "Key "
+ + objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ + " does not exists in storage";
assertThatThrownBy(() -> rsm.fetchIndex(REMOTE_LOG_METADATA, indexType))
.isInstanceOf(RemoteResourceNotFoundException.class)
@@ -570,14 +573,15 @@ void testFetchingIndexManifestNotFound(final IndexType indexType) throws Storage
.hasStackTraceContaining(expectedMessage);
}
- private void writeManifest(final ObjectKey objectKey) throws IOException {
+ private void writeManifest(final ObjectKeyFactory objectKeyFactory) throws IOException {
// Ensure the manifest exists.
final String manifest =
"{\"version\":\"1\","
+ "\"chunkIndex\":{\"type\":\"fixed\",\"originalChunkSize\":100,"
+ "\"originalFileSize\":1000,\"transformedChunkSize\":110,\"finalTransformedChunkSize\":110},"
+ "\"compression\":false}";
- final Path manifestPath = targetDir.resolve(objectKey.key(REMOTE_LOG_METADATA, ObjectKey.Suffix.MANIFEST));
+ final Path manifestPath = targetDir.resolve(
+ objectKeyFactory.key(REMOTE_LOG_METADATA, ObjectKeyFactory.Suffix.MANIFEST));
Files.createDirectories(manifestPath.getParent());
Files.writeString(manifestPath, manifest);
}
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKey.java b/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java
similarity index 97%
rename from core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKey.java
rename to core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java
index a695217ff..0ba09e107 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKey.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/ObjectKeyFactory.java
@@ -31,7 +31,7 @@
/**
* Maps Kafka segment files to object paths/keys in the storage backend.
*/
-public final class ObjectKey {
+public final class ObjectKeyFactory {
/**
* Supported files and extensions, including log, index types, and segment manifest.
@@ -68,7 +68,7 @@ static Suffix fromIndexType(final RemoteStorageManager.IndexType indexType) {
private final String prefix;
- public ObjectKey(final String prefix) {
+ public ObjectKeyFactory(final String prefix) {
this.prefix = prefix == null ? "" : prefix;
}
@@ -82,7 +82,7 @@ public ObjectKey(final String prefix) {
*
For example:
* {@code someprefix/topic-MWJ6FHTfRYy67jzwZdeqSQ/7/00000000000000001234-tqimKeZwStOEOwRzT3L5oQ.log}
*
- * @see ObjectKey#mainPath(RemoteLogSegmentMetadata)
+ * @see ObjectKeyFactory#mainPath(RemoteLogSegmentMetadata)
*/
public String key(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final Suffix suffix) {
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata cannot be null");
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
index 85bea09f9..cfddd1071 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java
@@ -113,7 +113,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private AesEncryptionProvider aesEncryptionProvider;
private ObjectMapper mapper;
private ChunkManager chunkManager;
- private ObjectKey objectKey;
+ private ObjectKeyFactory objectKeyFactory;
private SegmentCustomMetadataSerde customMetadataSerde;
private Set customMetadataFields;
@@ -138,7 +138,7 @@ public void configure(final Map configs) {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)));
metrics = new Metrics(time, metricConfig);
setStorage(config.storage());
- objectKey = new ObjectKey(config.keyPrefix());
+ objectKeyFactory = new ObjectKeyFactory(config.keyPrefix());
encryptionEnabled = config.encryptionEnabled();
if (encryptionEnabled) {
final Map keyRing = new HashMap<>();
@@ -198,7 +198,7 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat
remoteLogSegmentMetadata.segmentSizeInBytes());
final var customMetadataBuilder =
- new SegmentCustomMetadataBuilder(customMetadataFields, objectKey, remoteLogSegmentMetadata);
+ new SegmentCustomMetadataBuilder(customMetadataFields, objectKeyFactory, remoteLogSegmentMetadata);
final long startedMs = time.milliseconds();
@@ -291,15 +291,15 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
final TransformFinisher transformFinisher,
final SegmentCustomMetadataBuilder customMetadataBuilder)
throws IOException, StorageBackendException {
- final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG);
+ final String fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG);
try (final var sis = transformFinisher.toInputStream()) {
final var bytes = uploader.upload(sis, fileKey);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
- ObjectKey.Suffix.LOG,
+ ObjectKeyFactory.Suffix.LOG,
bytes
);
- customMetadataBuilder.addUploadResult(ObjectKey.Suffix.LOG, bytes);
+ customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes);
log.debug("Uploaded segment log for {}, size: {}", remoteLogSegmentMetadata, bytes);
}
@@ -321,8 +321,8 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta
final TransformFinisher transformFinisher =
new TransformFinisher(transformEnum);
- final var suffix = ObjectKey.Suffix.fromIndexType(indexType);
- final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
+ final var suffix = ObjectKeyFactory.Suffix.fromIndexType(indexType);
+ final String key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
try (final var in = transformFinisher.toInputStream()) {
final var bytes = uploader.upload(in, key);
metrics.recordObjectUpload(
@@ -341,16 +341,16 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad
final SegmentCustomMetadataBuilder customMetadataBuilder)
throws StorageBackendException, IOException {
final String manifest = mapper.writeValueAsString(segmentManifest);
- final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
+ final String manifestFileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);
try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
final var bytes = uploader.upload(manifestContent, manifestFileKey);
metrics.recordObjectUpload(
remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(),
- ObjectKey.Suffix.MANIFEST,
+ ObjectKeyFactory.Suffix.MANIFEST,
bytes
);
- customMetadataBuilder.addUploadResult(ObjectKey.Suffix.MANIFEST, bytes);
+ customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, bytes);
log.debug("Uploaded segment manifest for {}, size: {}", remoteLogSegmentMetadata, bytes);
}
@@ -384,7 +384,7 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata);
- final var suffix = ObjectKey.Suffix.LOG;
+ final var suffix = ObjectKeyFactory.Suffix.LOG;
final var segmentKey = objectKey(remoteLogSegmentMetadata, suffix);
return new FetchChunkEnumeration(chunkManager, segmentKey, segmentManifest, range)
@@ -404,7 +404,7 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
final var segmentManifest = fetchSegmentManifest(remoteLogSegmentMetadata);
- final var key = objectKey(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType));
+ final var key = objectKey(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.fromIndexType(indexType));
final var in = fetcher.fetch(key);
DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in);
@@ -425,21 +425,22 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
}
}
- private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final ObjectKey.Suffix suffix) {
+ private String objectKey(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ final ObjectKeyFactory.Suffix suffix) {
final String segmentKey;
if (remoteLogSegmentMetadata.customMetadata().isPresent()) {
final var customMetadataBytes = remoteLogSegmentMetadata.customMetadata().get();
final var fields = customMetadataSerde.deserialize(customMetadataBytes.value());
- segmentKey = objectKey.key(fields, remoteLogSegmentMetadata, suffix);
+ segmentKey = objectKeyFactory.key(fields, remoteLogSegmentMetadata, suffix);
} else {
- segmentKey = objectKey.key(remoteLogSegmentMetadata, suffix);
+ segmentKey = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
}
return segmentKey;
}
private SegmentManifest fetchSegmentManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata)
throws StorageBackendException, IOException {
- final String manifestKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
+ final String manifestKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);
return segmentManifestProvider.get(manifestKey);
}
@@ -455,8 +456,8 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment
final long startedMs = time.milliseconds();
try {
- for (final ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) {
- final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
+ for (final ObjectKeyFactory.Suffix suffix : ObjectKeyFactory.Suffix.values()) {
+ final String key = objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
deleter.delete(key);
}
} catch (final Exception e) {
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java
index b6176f7b7..88841e53e 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilder.java
@@ -23,25 +23,25 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
-import io.aiven.kafka.tieredstorage.ObjectKey;
+import io.aiven.kafka.tieredstorage.ObjectKeyFactory;
public class SegmentCustomMetadataBuilder {
- final ObjectKey objectKey;
+ final ObjectKeyFactory objectKeyFactory;
final RemoteLogSegmentMetadata segmentMetadata;
- final EnumMap uploadResults;
+ final EnumMap uploadResults;
final Set fields;
public SegmentCustomMetadataBuilder(final Set fields,
- final ObjectKey objectKey,
+ final ObjectKeyFactory objectKeyFactory,
final RemoteLogSegmentMetadata segmentMetadata) {
this.fields = fields;
- this.objectKey = objectKey;
+ this.objectKeyFactory = objectKeyFactory;
this.segmentMetadata = segmentMetadata;
- this.uploadResults = new EnumMap<>(ObjectKey.Suffix.class);
+ this.uploadResults = new EnumMap<>(ObjectKeyFactory.Suffix.class);
}
- public SegmentCustomMetadataBuilder addUploadResult(final ObjectKey.Suffix suffix,
+ public SegmentCustomMetadataBuilder addUploadResult(final ObjectKeyFactory.Suffix suffix,
final long bytes) {
if (uploadResults.containsKey(suffix)) {
throw new IllegalArgumentException("Upload results for suffix " + suffix + " already added");
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java
index 284cd8bd6..6e63bf0d3 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataField.java
@@ -24,13 +24,13 @@
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;
-import io.aiven.kafka.tieredstorage.ObjectKey;
+import io.aiven.kafka.tieredstorage.ObjectKeyFactory;
// index define values on custom metadata fields and cannot be changed without breaking compatibility.
public enum SegmentCustomMetadataField {
REMOTE_SIZE(0, new Field("remote_size", Type.VARLONG), SegmentCustomMetadataBuilder::totalSize),
- OBJECT_PREFIX(1, new Field("object_prefix", Type.COMPACT_STRING), b -> b.objectKey.prefix()),
- OBJECT_KEY(2, new Field("object_key", Type.COMPACT_STRING), b -> ObjectKey.mainPath(b.segmentMetadata));
+ OBJECT_PREFIX(1, new Field("object_prefix", Type.COMPACT_STRING), b -> b.objectKeyFactory.prefix()),
+ OBJECT_KEY(2, new Field("object_key", Type.COMPACT_STRING), b -> ObjectKeyFactory.mainPath(b.segmentMetadata));
static final TaggedFieldsSection FIELDS_SECTION = TaggedFieldsSection.of(
REMOTE_SIZE.index, REMOTE_SIZE.field,
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java
index 1a7d22fd9..db680578e 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/Metrics.java
@@ -30,7 +30,7 @@
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
-import io.aiven.kafka.tieredstorage.ObjectKey;
+import io.aiven.kafka.tieredstorage.ObjectKeyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -300,14 +300,14 @@ private void recordSegmentFetchRequests(final TopicPartition topicPartition) {
.record();
}
- public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKey.Suffix suffix,
+ public void recordObjectUpload(final TopicPartition topicPartition, final ObjectKeyFactory.Suffix suffix,
final long bytes) {
recordObjectUploadRequests(topicPartition, suffix);
recordObjectUploadBytes(topicPartition, suffix, bytes);
}
private void recordObjectUploadBytes(final TopicPartition topicPartition,
- final ObjectKey.Suffix suffix,
+ final ObjectKeyFactory.Suffix suffix,
final long bytes) {
new SensorProvider(metrics, sensorName(OBJECT_UPLOAD_BYTES))
.with(metricsRegistry.objectUploadBytesRate, new Rate())
@@ -348,7 +348,7 @@ private void recordObjectUploadBytes(final TopicPartition topicPartition,
.record(bytes);
}
- private void recordObjectUploadRequests(final TopicPartition topicPartition, final ObjectKey.Suffix suffix) {
+ private void recordObjectUploadRequests(final TopicPartition topicPartition, final ObjectKeyFactory.Suffix suffix) {
new SensorProvider(metrics, sensorName(OBJECT_UPLOAD))
.with(metricsRegistry.objectUploadRequestsRate, new Rate())
.with(metricsRegistry.objectUploadRequestsTotal, new CumulativeCount())
diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java
index e0d053726..e82efcde6 100644
--- a/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java
+++ b/core/src/main/java/io/aiven/kafka/tieredstorage/metrics/MetricsRegistry.java
@@ -21,7 +21,7 @@
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.TopicPartition;
-import io.aiven.kafka.tieredstorage.ObjectKey;
+import io.aiven.kafka.tieredstorage.ObjectKeyFactory;
public class MetricsRegistry {
@@ -241,7 +241,7 @@ public static String sensorName(final String name) {
return name;
}
- public static String sensorNameByObjectType(final ObjectKey.Suffix suffix, final String name) {
+ public static String sensorNameByObjectType(final ObjectKeyFactory.Suffix suffix, final String name) {
return TAG_NAME_OBJECT_TYPE + "." + suffix.value + "." + name;
}
@@ -250,7 +250,7 @@ public static String sensorNameByTopic(final TopicPartition topicPartition, fina
}
public static String sensorNameByTopicAndObjectType(final TopicPartition topicPartition,
- final ObjectKey.Suffix suffix,
+ final ObjectKeyFactory.Suffix suffix,
final String name) {
return TAG_NAME_TOPIC + "." + topicPartition.topic() + "."
+ TAG_NAME_OBJECT_TYPE + "." + suffix.value
@@ -264,7 +264,7 @@ public static String sensorNameByTopicPartition(final TopicPartition topicPartit
}
public static String sensorNameByTopicPartitionAndObjectType(final TopicPartition topicPartition,
- final ObjectKey.Suffix suffix,
+ final ObjectKeyFactory.Suffix suffix,
final String name) {
return TAG_NAME_TOPIC + "." + topicPartition.topic()
+ "." + TAG_NAME_PARTITION + "." + topicPartition.partition()
@@ -277,7 +277,7 @@ static Map topicTags(final TopicPartition topicPartition) {
}
static Map topicAndObjectTypeTags(final TopicPartition topicPartition,
- final ObjectKey.Suffix suffix) {
+ final ObjectKeyFactory.Suffix suffix) {
return Map.of(
TAG_NAME_TOPIC, topicPartition.topic(),
TAG_NAME_OBJECT_TYPE, suffix.value
@@ -292,7 +292,7 @@ static Map topicPartitionTags(final TopicPartition topicPartitio
}
static Map topicPartitionAndObjectTypeTags(final TopicPartition topicPartition,
- final ObjectKey.Suffix suffix) {
+ final ObjectKeyFactory.Suffix suffix) {
return Map.of(
TAG_NAME_TOPIC, topicPartition.topic(),
TAG_NAME_PARTITION, String.valueOf(topicPartition.partition()),
@@ -300,7 +300,7 @@ static Map topicPartitionAndObjectTypeTags(final TopicPartition
);
}
- static Map objectTypeTags(final ObjectKey.Suffix suffix) {
+ static Map objectTypeTags(final ObjectKeyFactory.Suffix suffix) {
return Map.of(TAG_NAME_OBJECT_TYPE, suffix.value);
}
}
diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java
new file mode 100644
index 000000000..50a4ec5e7
--- /dev/null
+++ b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyFactoryTest.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage;
+
+import java.util.Map;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+
+import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ObjectKeyFactoryTest {
+ static final Uuid TOPIC_ID = Uuid.METADATA_TOPIC_ID; // string representation: AAAAAAAAAAAAAAAAAAAAAQ
+ static final Uuid SEGMENT_ID = Uuid.ZERO_UUID; // string representation: AAAAAAAAAAAAAAAAAAAAAA
+ static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(TOPIC_ID, new TopicPartition("topic", 7));
+ static final RemoteLogSegmentId REMOTE_LOG_SEGMENT_ID = new RemoteLogSegmentId(TOPIC_ID_PARTITION, SEGMENT_ID);
+ static final RemoteLogSegmentMetadata REMOTE_LOG_SEGMENT_METADATA = new RemoteLogSegmentMetadata(
+ REMOTE_LOG_SEGMENT_ID, 1234L, 2000L,
+ 0, 0, 0, 0, Map.of(0, 0L));
+
+ @Test
+ void test() {
+ final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
+ assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG))
+ .isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
+ assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX))
+ .isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index");
+ assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX))
+ .isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex");
+ assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT))
+ .isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot");
+ assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX))
+ .isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex");
+ assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT))
+ .isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint");
+ assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST))
+ .isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest");
+ }
+
+ @Test
+ void withCustomFieldsEmpty() {
+ final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
+ final Map fields = Map.of();
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG)
+ ).isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX)
+ ).isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX)
+ ).isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT)
+ ).isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX)
+ ).isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT)
+ ).isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ ).isEqualTo(
+ "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest");
+ }
+
+ @Test
+ void withCustomFieldsOnlyPrefix() {
+ final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
+ final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG)
+ ).isEqualTo(
+ "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX)
+ ).isEqualTo(
+ "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX)
+ ).isEqualTo(
+ "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT)
+ ).isEqualTo(
+ "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX)
+ ).isEqualTo(
+ "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT)
+ ).isEqualTo(
+ "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ ).isEqualTo(
+ "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
+ + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest");
+ }
+
+ @Test
+ void withCustomFieldsOnlyKey() {
+ final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
+ final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG)
+ ).isEqualTo("prefix/topic/7/file.log");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX)
+ ).isEqualTo("prefix/topic/7/file.index");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX)
+ ).isEqualTo("prefix/topic/7/file.timeindex");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT)
+ ).isEqualTo("prefix/topic/7/file.snapshot");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX)
+ ).isEqualTo("prefix/topic/7/file.txnindex");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT)
+ ).isEqualTo("prefix/topic/7/file.leader-epoch-checkpoint");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ ).isEqualTo("prefix/topic/7/file.rsm-manifest");
+ }
+
+ @Test
+ void withCustomFieldsAll() {
+ final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory("prefix/");
+ final Map fields = Map.of(
+ SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/",
+ SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG)
+ ).isEqualTo("other/topic/7/file.log");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.OFFSET_INDEX)
+ ).isEqualTo("other/topic/7/file.index");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TIME_INDEX)
+ ).isEqualTo("other/topic/7/file.timeindex");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT)
+ ).isEqualTo("other/topic/7/file.snapshot");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.TXN_INDEX)
+ ).isEqualTo("other/topic/7/file.txnindex");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT)
+ ).isEqualTo("other/topic/7/file.leader-epoch-checkpoint");
+ assertThat(
+ objectKeyFactory.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.MANIFEST)
+ ).isEqualTo("other/topic/7/file.rsm-manifest");
+ }
+
+ @Test
+ void nullPrefix() {
+ final ObjectKeyFactory objectKeyFactory = new ObjectKeyFactory(null);
+ assertThat(objectKeyFactory.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKeyFactory.Suffix.LOG))
+ .isEqualTo(
+ "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
+ }
+
+ @Test
+ void suffixForIndexTypes() {
+ assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.OFFSET))
+ .isEqualTo(ObjectKeyFactory.Suffix.OFFSET_INDEX)
+ .extracting("value")
+ .isEqualTo("index");
+ assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.TIMESTAMP))
+ .isEqualTo(ObjectKeyFactory.Suffix.TIME_INDEX)
+ .extracting("value")
+ .isEqualTo("timeindex");
+ assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT))
+ .isEqualTo(ObjectKeyFactory.Suffix.PRODUCER_SNAPSHOT)
+ .extracting("value")
+ .isEqualTo("snapshot");
+ assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.TRANSACTION))
+ .isEqualTo(ObjectKeyFactory.Suffix.TXN_INDEX)
+ .extracting("value")
+ .isEqualTo("txnindex");
+ assertThat(ObjectKeyFactory.Suffix.fromIndexType(RemoteStorageManager.IndexType.LEADER_EPOCH))
+ .isEqualTo(ObjectKeyFactory.Suffix.LEADER_EPOCH_CHECKPOINT)
+ .extracting("value")
+ .isEqualTo("leader-epoch-checkpoint");
+ }
+}
diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyTest.java
deleted file mode 100644
index 4ed07a959..000000000
--- a/core/src/test/java/io/aiven/kafka/tieredstorage/ObjectKeyTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Copyright 2023 Aiven Oy
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.aiven.kafka.tieredstorage;
-
-import java.util.Map;
-
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
-import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
-
-import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-class ObjectKeyTest {
- static final Uuid TOPIC_ID = Uuid.METADATA_TOPIC_ID; // string representation: AAAAAAAAAAAAAAAAAAAAAQ
- static final Uuid SEGMENT_ID = Uuid.ZERO_UUID; // string representation: AAAAAAAAAAAAAAAAAAAAAA
- static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(TOPIC_ID, new TopicPartition("topic", 7));
- static final RemoteLogSegmentId REMOTE_LOG_SEGMENT_ID = new RemoteLogSegmentId(TOPIC_ID_PARTITION, SEGMENT_ID);
- static final RemoteLogSegmentMetadata REMOTE_LOG_SEGMENT_METADATA = new RemoteLogSegmentMetadata(
- REMOTE_LOG_SEGMENT_ID, 1234L, 2000L,
- 0, 0, 0, 0, Map.of(0, 0L));
-
- @Test
- void test() {
- final ObjectKey objectKey = new ObjectKey("prefix/");
- assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
- assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index");
- assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex");
- assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot");
- assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex");
- assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint");
- assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest");
- }
-
- @Test
- void withCustomFieldsEmpty() {
- final ObjectKey objectKey = new ObjectKey("prefix/");
- final Map fields = Map.of();
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST))
- .isEqualTo(
- "prefix/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest");
- }
-
- @Test
- void withCustomFieldsOnlyPrefix() {
- final ObjectKey objectKey = new ObjectKey("prefix/");
- final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG))
- .isEqualTo(
- "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX))
- .isEqualTo(
- "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.index");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX))
- .isEqualTo(
- "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.timeindex");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT))
- .isEqualTo(
- "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.snapshot");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX))
- .isEqualTo(
- "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.txnindex");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT))
- .isEqualTo(
- "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.leader-epoch-checkpoint");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST))
- .isEqualTo(
- "other/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/"
- + "00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest");
- }
-
- @Test
- void withCustomFieldsOnlyKey() {
- final ObjectKey objectKey = new ObjectKey("prefix/");
- final Map fields = Map.of(SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG))
- .isEqualTo("prefix/topic/7/file.log");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX))
- .isEqualTo("prefix/topic/7/file.index");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX))
- .isEqualTo("prefix/topic/7/file.timeindex");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT))
- .isEqualTo("prefix/topic/7/file.snapshot");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX))
- .isEqualTo("prefix/topic/7/file.txnindex");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT))
- .isEqualTo("prefix/topic/7/file.leader-epoch-checkpoint");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST))
- .isEqualTo("prefix/topic/7/file.rsm-manifest");
- }
-
- @Test
- void withCustomFieldsAll() {
- final ObjectKey objectKey = new ObjectKey("prefix/");
- final Map fields = Map.of(
- SegmentCustomMetadataField.OBJECT_PREFIX.index(), "other/",
- SegmentCustomMetadataField.OBJECT_KEY.index(), "topic/7/file");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG))
- .isEqualTo("other/topic/7/file.log");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.OFFSET_INDEX))
- .isEqualTo("other/topic/7/file.index");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TIME_INDEX))
- .isEqualTo("other/topic/7/file.timeindex");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.PRODUCER_SNAPSHOT))
- .isEqualTo("other/topic/7/file.snapshot");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.TXN_INDEX))
- .isEqualTo("other/topic/7/file.txnindex");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT))
- .isEqualTo("other/topic/7/file.leader-epoch-checkpoint");
- assertThat(objectKey.key(fields, REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.MANIFEST))
- .isEqualTo("other/topic/7/file.rsm-manifest");
- }
-
- @Test
- void nullPrefix() {
- final ObjectKey objectKey = new ObjectKey(null);
- assertThat(objectKey.key(REMOTE_LOG_SEGMENT_METADATA, ObjectKey.Suffix.LOG))
- .isEqualTo(
- "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000001234-AAAAAAAAAAAAAAAAAAAAAA.log");
- }
-
- @Test
- void suffixForIndexTypes() {
- assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.OFFSET))
- .isEqualTo(ObjectKey.Suffix.OFFSET_INDEX)
- .extracting("value")
- .isEqualTo("index");
- assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.TIMESTAMP))
- .isEqualTo(ObjectKey.Suffix.TIME_INDEX)
- .extracting("value")
- .isEqualTo("timeindex");
- assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT))
- .isEqualTo(ObjectKey.Suffix.PRODUCER_SNAPSHOT)
- .extracting("value")
- .isEqualTo("snapshot");
- assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.TRANSACTION))
- .isEqualTo(ObjectKey.Suffix.TXN_INDEX)
- .extracting("value")
- .isEqualTo("txnindex");
- assertThat(ObjectKey.Suffix.fromIndexType(RemoteStorageManager.IndexType.LEADER_EPOCH))
- .isEqualTo(ObjectKey.Suffix.LEADER_EPOCH_CHECKPOINT)
- .extracting("value")
- .isEqualTo("leader-epoch-checkpoint");
- }
-}
diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java
index ba894af96..825d30374 100644
--- a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java
+++ b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java
@@ -153,7 +153,7 @@ void metricsShouldBeReported(final String tags) throws RemoteStorageException, J
assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-bytes-rate"))
.isEqualTo(1575.0 / METRIC_TIME_WINDOW_SEC);
- for (final var suffix : ObjectKey.Suffix.values()) {
+ for (final var suffix : ObjectKeyFactory.Suffix.values()) {
final ObjectName storageMetricsName = ObjectName.getInstance(objectName + ",object-type=" + suffix.value);
switch (suffix) {
case TXN_INDEX:
diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java
index 74bc9ec99..2a8ce1af9 100644
--- a/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java
+++ b/core/src/test/java/io/aiven/kafka/tieredstorage/metadata/SegmentCustomMetadataBuilderTest.java
@@ -25,7 +25,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
-import io.aiven.kafka.tieredstorage.ObjectKey;
+import io.aiven.kafka.tieredstorage.ObjectKeyFactory;
import org.junit.jupiter.api.Test;
@@ -43,24 +43,24 @@ class SegmentCustomMetadataBuilderTest {
SEGMENT_ID),
1, 100, -1, -1, 1L,
100, Collections.singletonMap(1, 100L));
- static final ObjectKey OBJECT_KEY = new ObjectKey("p1");
+ static final ObjectKeyFactory OBJECT_KEY_FACTORY = new ObjectKeyFactory("p1");
@Test
void shouldBuildEmptyMap() {
- final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA);
+ final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA);
assertThat(b.build()).isEmpty();
// even when upload results are added
- b.addUploadResult(ObjectKey.Suffix.MANIFEST, 10L);
+ b.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 10L);
assertThat(b.build()).isEmpty();
}
@Test
void shouldFailWhenAddingExistingSuffixUploadResult() {
- final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA);
+ final var b = new SegmentCustomMetadataBuilder(Set.of(), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA);
- b.addUploadResult(ObjectKey.Suffix.MANIFEST, 10L);
- assertThatThrownBy(() -> b.addUploadResult(ObjectKey.Suffix.MANIFEST, 20L))
+ b.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 10L);
+ assertThatThrownBy(() -> b.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 20L))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Upload results for suffix MANIFEST already added");
}
@@ -68,15 +68,15 @@ void shouldFailWhenAddingExistingSuffixUploadResult() {
@Test
void shouldIncludeTotalSize() {
final var field = SegmentCustomMetadataField.REMOTE_SIZE;
- final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA);
+ final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA);
var fields = b.build();
assertThat(fields)
.containsExactly(entry(field.index, 0L)); // i.e. no upload results
// but, when existing upload results
b
- .addUploadResult(ObjectKey.Suffix.LOG, 40L)
- .addUploadResult(ObjectKey.Suffix.MANIFEST, 2L);
+ .addUploadResult(ObjectKeyFactory.Suffix.LOG, 40L)
+ .addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, 2L);
fields = b.build();
assertThat(fields)
.containsExactly(entry(field.index, 42L)); // i.e. sum
@@ -85,7 +85,7 @@ void shouldIncludeTotalSize() {
@Test
void shouldIncludeObjectPrefix() {
final var field = SegmentCustomMetadataField.OBJECT_PREFIX;
- final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA);
+ final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA);
final var fields = b.build();
assertThat(fields)
.containsExactly(entry(field.index, "p1"));
@@ -94,7 +94,7 @@ void shouldIncludeObjectPrefix() {
@Test
void shouldIncludeObjectKey() {
final var field = SegmentCustomMetadataField.OBJECT_KEY;
- final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY, REMOTE_LOG_SEGMENT_METADATA);
+ final var b = new SegmentCustomMetadataBuilder(Set.of(field), OBJECT_KEY_FACTORY, REMOTE_LOG_SEGMENT_METADATA);
final var fields = b.build();
assertThat(fields)
.containsExactly(entry(field.index, "topic-" + TOPIC_ID + "/0/00000000000000000001-" + SEGMENT_ID));