diff --git a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index 44ee1bfa4..e93273282 100644 --- a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -17,7 +17,6 @@ package io.aiven.kafka.tieredstorage; import javax.crypto.Cipher; -import javax.crypto.SecretKey; import javax.crypto.spec.GCMParameterSpec; import javax.crypto.spec.SecretKeySpec; @@ -55,8 +54,8 @@ import io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache; import io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache; import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; -import io.aiven.kafka.tieredstorage.manifest.serde.DataKeyDeserializer; -import io.aiven.kafka.tieredstorage.manifest.serde.DataKeySerializer; +import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule; +import io.aiven.kafka.tieredstorage.manifest.serde.KafkaTypeSerdeModule; import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField; import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataSerde; import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider; @@ -66,7 +65,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.github.luben.zstd.Zstd; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.BeforeEach; @@ -328,6 +326,7 @@ private void checkEncryption(final boolean compression) throws IOException { // 3. The AAD is used. final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(KafkaTypeSerdeModule.create()); final JsonNode manifest = objectMapper.readTree(new File(targetDir.toString(), TARGET_MANIFEST_FILE)); final String dataKeyText = manifest.get("encryption").get("dataKey").asText(); @@ -338,13 +337,8 @@ private void checkEncryption(final boolean compression) throws IOException { final byte[] dataKey = rsaEncryptionProvider.decryptDataKey( new EncryptedDataKey(KEY_ENCRYPTION_KEY_ID, encryptedDataKey)); final byte[] aad = manifest.get("encryption").get("aad").binaryValue(); + objectMapper.registerModule(EncryptionSerdeModule.create(rsaEncryptionProvider)); - final SimpleModule simpleModule = new SimpleModule(); - simpleModule.addSerializer(SecretKey.class, - new DataKeySerializer(rsaEncryptionProvider::encryptDataKey)); - simpleModule.addDeserializer(SecretKey.class, - new DataKeyDeserializer(rsaEncryptionProvider::decryptDataKey)); - objectMapper.registerModule(simpleModule); final ChunkIndex chunkIndex = objectMapper.treeToValue(manifest.get("chunkIndex"), ChunkIndex.class); try (final InputStream originalInputStream = Files.newInputStream(logFilePath); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index fa174378c..8a5a5f7b0 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -16,8 +16,6 @@ package io.aiven.kafka.tieredstorage; -import javax.crypto.SecretKey; - import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; @@ -52,8 +50,8 @@ import io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider; import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1; import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; -import io.aiven.kafka.tieredstorage.manifest.serde.DataKeyDeserializer; -import io.aiven.kafka.tieredstorage.manifest.serde.DataKeySerializer; +import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule; +import io.aiven.kafka.tieredstorage.manifest.serde.KafkaTypeSerdeModule; import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataBuilder; import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField; import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataSerde; @@ -81,7 +79,6 @@ import io.aiven.kafka.tieredstorage.transform.TransformFinisher; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -180,13 +177,9 @@ void setStorage(final StorageBackend storage) { private ObjectMapper getObjectMapper() { final ObjectMapper objectMapper = new ObjectMapper(); objectMapper.registerModule(new Jdk8Module()); + objectMapper.registerModule(KafkaTypeSerdeModule.create()); if (encryptionEnabled) { - final SimpleModule simpleModule = new SimpleModule(); - simpleModule.addSerializer(SecretKey.class, - new DataKeySerializer(rsaEncryptionProvider::encryptDataKey)); - simpleModule.addDeserializer(SecretKey.class, - new DataKeyDeserializer(rsaEncryptionProvider::decryptDataKey)); - objectMapper.registerModule(simpleModule); + objectMapper.registerModule(EncryptionSerdeModule.create(rsaEncryptionProvider)); } return objectMapper; } @@ -229,7 +222,7 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat final ChunkIndex chunkIndex = transformFinisher.chunkIndex(); final SegmentManifest segmentManifest = - new SegmentManifestV1(chunkIndex, requiresCompression, encryptionMetadata); + new SegmentManifestV1(chunkIndex, requiresCompression, encryptionMetadata, remoteLogSegmentMetadata); uploadManifest(remoteLogSegmentMetadata, segmentManifest, customMetadataBuilder); final InputStream offsetIndex = Files.newInputStream(logSegmentData.offsetIndex()); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifest.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifest.java index e5dae196f..729cc92f4 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifest.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifest.java @@ -18,6 +18,8 @@ import java.util.Optional; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; + import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; import com.fasterxml.jackson.annotation.JsonSubTypes; @@ -38,4 +40,6 @@ public interface SegmentManifest { boolean compression(); Optional encryption(); + + RemoteLogSegmentMetadata remoteLogSegmentMetadata(); } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1.java index 938458370..7097d4fce 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1.java @@ -19,6 +19,8 @@ import java.util.Objects; import java.util.Optional; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; + import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; import com.fasterxml.jackson.annotation.JsonCreator; @@ -29,6 +31,7 @@ public class SegmentManifestV1 implements SegmentManifest { private final ChunkIndex chunkIndex; private final boolean compression; private final SegmentEncryptionMetadataV1 encryption; + private final RemoteLogSegmentMetadata remoteLogSegmentMetadata; @JsonCreator public SegmentManifestV1(@JsonProperty(value = "chunkIndex", required = true) @@ -37,10 +40,19 @@ public SegmentManifestV1(@JsonProperty(value = "chunkIndex", required = true) final boolean compression, @JsonProperty("encryption") final SegmentEncryptionMetadataV1 encryption) { + this(chunkIndex, compression, encryption, null); + } + + public SegmentManifestV1(final ChunkIndex chunkIndex, + final boolean compression, + final SegmentEncryptionMetadataV1 encryption, + final RemoteLogSegmentMetadata remoteLogSegmentMetadata) { this.chunkIndex = Objects.requireNonNull(chunkIndex, "chunkIndex cannot be null"); this.compression = compression; this.encryption = encryption; + + this.remoteLogSegmentMetadata = remoteLogSegmentMetadata; } @Override @@ -62,6 +74,13 @@ public Optional encryption() { return Optional.ofNullable(encryption); } + @Override + // We don't need to deserialize it + @JsonProperty(value = "remoteLogSegmentMetadata", access = JsonProperty.Access.READ_ONLY) + public RemoteLogSegmentMetadata remoteLogSegmentMetadata() { + return remoteLogSegmentMetadata; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -79,6 +98,7 @@ public boolean equals(final Object o) { if (!chunkIndex.equals(that.chunkIndex)) { return false; } + // We don't want remoteLogSegmentMetadata to participate in hash code and equality checks. return Objects.equals(encryption, that.encryption); } @@ -87,6 +107,7 @@ public int hashCode() { int result = chunkIndex.hashCode(); result = 31 * result + (compression ? 1 : 0); result = 31 * result + (encryption != null ? encryption.hashCode() : 0); + // We don't want remoteLogSegmentMetadata to participate in hash code and equality checks. return result; } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/DataKeyDeserializer.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/DataKeyDeserializer.java index b09584bf6..8dcc6a7b1 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/DataKeyDeserializer.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/DataKeyDeserializer.java @@ -30,10 +30,10 @@ import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; -public class DataKeyDeserializer extends StdDeserializer { +class DataKeyDeserializer extends StdDeserializer { private final Function keyDecryptor; - public DataKeyDeserializer(final Function keyDecryptor) { + DataKeyDeserializer(final Function keyDecryptor) { super(SecretKey.class); this.keyDecryptor = Objects.requireNonNull(keyDecryptor, "keyDecryptor cannot be null"); } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/DataKeySerializer.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/DataKeySerializer.java index e42cc79ce..fdd397212 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/DataKeySerializer.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/DataKeySerializer.java @@ -28,10 +28,10 @@ import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; -public class DataKeySerializer extends StdSerializer { +class DataKeySerializer extends StdSerializer { private final Function dataKeyEncryptor; - public DataKeySerializer(final Function dataKeyEncryptor) { + DataKeySerializer(final Function dataKeyEncryptor) { super(SecretKey.class); this.dataKeyEncryptor = Objects.requireNonNull(dataKeyEncryptor, "dataKeyEncryptor cannot be null"); } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/EncryptionSerdeModule.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/EncryptionSerdeModule.java new file mode 100644 index 000000000..18ad75b02 --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/EncryptionSerdeModule.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.manifest.serde; + +import javax.crypto.SecretKey; + +import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; + +public final class EncryptionSerdeModule { + public static Module create(final RsaEncryptionProvider rsaEncryptionProvider) { + final var module = new SimpleModule(); + + module.addSerializer(SecretKey.class, + new DataKeySerializer(rsaEncryptionProvider::encryptDataKey)); + module.addDeserializer(SecretKey.class, + new DataKeyDeserializer(rsaEncryptionProvider::decryptDataKey)); + + return module; + } +} diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/KafkaTypeSerdeModule.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/KafkaTypeSerdeModule.java new file mode 100644 index 000000000..3bb948ec2 --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/serde/KafkaTypeSerdeModule.java @@ -0,0 +1,116 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.manifest.serde; + +import java.io.IOException; +import java.util.Map; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; + +public class KafkaTypeSerdeModule { + public static Module create() { + final var module = new SimpleModule(); + + module.addSerializer(Uuid.class, new UuidSerializer()); + + module.setMixInAnnotation(TopicPartition.class, TopicPartitionSerdeMixin.class); + module.setMixInAnnotation(TopicIdPartition.class, TopicIdPartitionSerdeMixin.class); + module.setMixInAnnotation(RemoteLogSegmentId.class, RemoteLogSegmentIdMixin.class); + module.setMixInAnnotation(RemoteLogSegmentMetadata.class, RemoteLogSegmentMetadataMixin.class); + + return module; + } + + private static class UuidSerializer extends StdSerializer { + UuidSerializer() { + super(Uuid.class); + } + + @Override + public void serialize(final Uuid value, + final JsonGenerator gen, + final SerializerProvider provider) throws IOException { + gen.writeString(value.toString()); + } + } + + @JsonPropertyOrder({ "topic", "partition" }) + private abstract static class TopicPartitionSerdeMixin { + @JsonProperty("topic") + public abstract String topic(); + + @JsonProperty("partition") + public abstract int partition(); + } + + @JsonPropertyOrder({ "topicId", "topicPartition" }) + private abstract static class TopicIdPartitionSerdeMixin { + @JsonProperty("topicId") + public abstract Uuid topicId(); + + @JsonProperty("topicPartition") + public abstract TopicPartition topicPartition(); + } + + @JsonPropertyOrder({ "topicIdPartition", "id" }) + private abstract static class RemoteLogSegmentIdMixin { + @JsonProperty("topicIdPartition") + abstract TopicIdPartition topicIdPartition(); + + @JsonProperty("id") + abstract Uuid id(); + } + + @JsonPropertyOrder({ + "remoteLogSegmentId", "startOffset", "endOffset", + "maxTimestampMs", "brokerId", "eventTimestampMs", "segmentLeaderEpochs" + }) + private abstract static class RemoteLogSegmentMetadataMixin { + @JsonProperty("remoteLogSegmentId") + public abstract RemoteLogSegmentId remoteLogSegmentId(); + + @JsonProperty("startOffset") + public abstract long startOffset(); + + @JsonProperty("endOffset") + public abstract long endOffset(); + + @JsonProperty("maxTimestampMs") + public abstract long maxTimestampMs(); + + @JsonProperty("brokerId") + public abstract int brokerId(); + + @JsonProperty("eventTimestampMs") + public abstract long eventTimestampMs(); + + @JsonProperty("segmentLeaderEpochs") + public abstract Map segmentLeaderEpochs(); + } +} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java index 24029a8f1..ba894af96 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerMetricsTest.java @@ -149,9 +149,9 @@ void metricsShouldBeReported(final String tags) throws RemoteStorageException, J .isEqualTo(18.0 / METRIC_TIME_WINDOW_SEC); assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-bytes-total")) - .isEqualTo(657.0); + .isEqualTo(1575.0); assertThat(MBEAN_SERVER.getAttribute(metricName, "object-upload-bytes-rate")) - .isEqualTo(657.0 / METRIC_TIME_WINDOW_SEC); + .isEqualTo(1575.0 / METRIC_TIME_WINDOW_SEC); for (final var suffix : ObjectKey.Suffix.values()) { final ObjectName storageMetricsName = ObjectName.getInstance(objectName + ",object-type=" + suffix.value); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java index af8adc6ae..6335a8763 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java @@ -51,7 +51,7 @@ class DefaultChunkManagerTest extends AesKeyAwareTest { void testGetChunk() throws Exception { final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10); - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream("0123456789".getBytes())); @@ -77,7 +77,7 @@ void testGetChunkWithEncryption() throws Exception { new ByteArrayInputStream(encrypted)); final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, - new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad)); + new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad), null); final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider); assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); @@ -97,7 +97,7 @@ void testGetChunkWithCompression() throws Exception { when(storage.fetch(OBJECT_KEY_PATH, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream(compressed)); - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, true, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, true, null, null); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); assertThat(chunkManager.getChunk(OBJECT_KEY_PATH, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java index 67adca676..f759895b7 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java @@ -64,7 +64,8 @@ class ChunkCacheTest { private static final byte[] CHUNK_0 = "0123456789".getBytes(); private static final byte[] CHUNK_1 = "1011121314".getBytes(); private static final FixedSizeChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(10, 10, 10, 10); - private static final SegmentManifest SEGMENT_MANIFEST = new SegmentManifestV1(FIXED_SIZE_CHUNK_INDEX, false, null); + private static final SegmentManifest SEGMENT_MANIFEST = + new SegmentManifestV1(FIXED_SIZE_CHUNK_INDEX, false, null, null); private static final String TEST_EXCEPTION_MESSAGE = "test_message"; private static final String SEGMENT_KEY = "topic/segment"; @Mock diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java index 5fb4f2a1d..b81e3fdd6 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.ForkJoinPool; import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; +import io.aiven.kafka.tieredstorage.manifest.serde.KafkaTypeSerdeModule; import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -51,6 +52,7 @@ class SegmentManifestProviderTest { static { MAPPER.registerModule(new Jdk8Module()); + MAPPER.registerModule(KafkaTypeSerdeModule.create()); } static final String MANIFEST = @@ -94,7 +96,7 @@ void shouldReturnAndCache() throws StorageBackendException, IOException { .thenReturn(new ByteArrayInputStream(MANIFEST.getBytes())); final SegmentManifestV1 expectedManifest = new SegmentManifestV1( new FixedSizeChunkIndex(100, 1000, 110, 110), - false, null + false, null, null ); assertThat(provider.get(key)).isEqualTo(expectedManifest); verify(storage).fetch(key); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1SerdeTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1SerdeTest.java index 03b0798a0..0fc4909a5 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1SerdeTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1SerdeTest.java @@ -20,17 +20,22 @@ import javax.crypto.spec.SecretKeySpec; import java.util.Base64; +import java.util.Map; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import io.aiven.kafka.tieredstorage.RsaKeyAwareTest; import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; -import io.aiven.kafka.tieredstorage.manifest.serde.DataKeyDeserializer; -import io.aiven.kafka.tieredstorage.manifest.serde.DataKeySerializer; +import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule; +import io.aiven.kafka.tieredstorage.manifest.serde.KafkaTypeSerdeModule; import io.aiven.kafka.tieredstorage.security.EncryptedDataKey; import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import org.junit.jupiter.api.BeforeEach; @@ -44,16 +49,43 @@ class SegmentManifestV1SerdeTest extends RsaKeyAwareTest { static final SecretKey DATA_KEY = new SecretKeySpec(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, "AES"); static final byte[] AAD = {10, 11, 12, 13}; + static final RemoteLogSegmentMetadata REMOTE_LOG_SEGMENT_METADATA = new RemoteLogSegmentMetadata( + new RemoteLogSegmentId( + new TopicIdPartition(Uuid.fromString("lZ6vvmajTWKDBUTV6SQAtQ"), 42, "topic1"), + Uuid.fromString("adh9f8BMS4anaUnD8KWfWg") + ), + 0, + 1000L, + 1000000000L, + 2, + 2000000000L, + 100500, + Map.of(0, 100L, 1, 200L, 2, 300L) + ); + + static final String REMOTE_LOG_SEGMENT_METADATA_JSON = "{" + + "\"remoteLogSegmentId\":{" + + "\"topicIdPartition\":{\"topicId\":\"lZ6vvmajTWKDBUTV6SQAtQ\",\"topicPartition\":" + + "{\"topic\":\"topic1\",\"partition\":42}}," + + "\"id\":\"adh9f8BMS4anaUnD8KWfWg\"}," + + "\"startOffset\":0,\"endOffset\":1000," + + "\"maxTimestampMs\":1000000000,\"brokerId\":2,\"eventTimestampMs\":2000000000," + + "\"segmentLeaderEpochs\":{\"0\":100,\"1\":200,\"2\":300}}"; + static final String WITH_ENCRYPTION_WITHOUT_SECRET_KEY_JSON = "{\"version\":\"1\"," + "\"chunkIndex\":{\"type\":\"fixed\",\"originalChunkSize\":100," + "\"originalFileSize\":1000,\"transformedChunkSize\":110,\"finalTransformedChunkSize\":110}," - + "\"compression\":false,\"encryption\":{\"aad\":\"CgsMDQ==\"}}"; + + "\"compression\":false,\"encryption\":{\"aad\":\"CgsMDQ==\"},\"remoteLogSegmentMetadata\":" + + REMOTE_LOG_SEGMENT_METADATA_JSON + + "}"; static final String WITHOUT_ENCRYPTION_JSON = "{\"version\":\"1\"," + "\"chunkIndex\":{\"type\":\"fixed\",\"originalChunkSize\":100," + "\"originalFileSize\":1000,\"transformedChunkSize\":110,\"finalTransformedChunkSize\":110}," - + "\"compression\":false}"; + + "\"compression\":false,\"remoteLogSegmentMetadata\":" + + REMOTE_LOG_SEGMENT_METADATA_JSON + + "}"; ObjectMapper mapper; RsaEncryptionProvider rsaEncryptionProvider; @@ -62,21 +94,16 @@ class SegmentManifestV1SerdeTest extends RsaKeyAwareTest { void init() { mapper = new ObjectMapper(); mapper.registerModule(new Jdk8Module()); + mapper.registerModule(KafkaTypeSerdeModule.create()); rsaEncryptionProvider = new RsaEncryptionProvider(KEY_ENCRYPTION_KEY_ID, keyRing); - - final SimpleModule simpleModule = new SimpleModule(); - simpleModule.addSerializer(SecretKey.class, - new DataKeySerializer(rsaEncryptionProvider::encryptDataKey)); - simpleModule.addDeserializer(SecretKey.class, - new DataKeyDeserializer(rsaEncryptionProvider::decryptDataKey)); - mapper.registerModule(simpleModule); + mapper.registerModule(EncryptionSerdeModule.create(rsaEncryptionProvider)); } @Test void withEncryption() throws JsonProcessingException { final SegmentManifest manifest = new SegmentManifestV1(INDEX, false, - new SegmentEncryptionMetadataV1(DATA_KEY, AAD)); + new SegmentEncryptionMetadataV1(DATA_KEY, AAD), REMOTE_LOG_SEGMENT_METADATA); final String jsonStr = mapper.writeValueAsString(manifest); @@ -102,7 +129,7 @@ void withEncryption() throws JsonProcessingException { @Test void withoutEncryption() throws JsonProcessingException { - final SegmentManifest manifest = new SegmentManifestV1(INDEX, false, null); + final SegmentManifest manifest = new SegmentManifestV1(INDEX, false, null, REMOTE_LOG_SEGMENT_METADATA); final String jsonStr = mapper.writeValueAsString(manifest); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java index 1749656c3..9fb048a53 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java @@ -62,7 +62,7 @@ class FetchChunkEnumerationSourceInputStreamClosingTest { static final FixedSizeChunkIndex CHUNK_INDEX = new FixedSizeChunkIndex( CHUNK_SIZE, CHUNK_SIZE * 3, CHUNK_SIZE, CHUNK_SIZE); static final SegmentManifest SEGMENT_MANIFEST = new SegmentManifestV1( - CHUNK_INDEX, false, null); + CHUNK_INDEX, false, null, null); TestObjectFetcher fetcher; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java index e767b948e..c37b9987a 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java @@ -41,7 +41,7 @@ class FetchChunkEnumerationTest { DefaultChunkManager chunkManager; final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 100, 10, 100); - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); static final byte[] CHUNK_CONTENT = "0123456789".getBytes(); static final String SEGMENT_KEY_PATH = "topic/segment"; @@ -52,7 +52,7 @@ class FetchChunkEnumerationTest { @Test void failsWhenLargerStartPosition() { // Given - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null); + final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null, null); // When final int from = 1000; final int to = from + 1;