diff --git a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index 6b1b7f87d..bb19ea982 100644 --- a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -20,6 +20,7 @@ import javax.crypto.spec.GCMParameterSpec; import javax.crypto.spec.SecretKeySpec; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -55,6 +56,8 @@ import io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache; import io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache; +import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1; +import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder; import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule; import io.aiven.kafka.tieredstorage.manifest.serde.KafkaTypeSerdeModule; @@ -79,6 +82,7 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -465,6 +469,79 @@ void testRequiresCompression(final CompressionType compressionType, final boolea assertThat(requires).isEqualTo(expectedResult); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testTransformingIndexes(final boolean encryption) { + final var config = new HashMap<>(Map.of( + "chunk.size", "10", + "storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage", + "storage.root", targetDir.toString(), + "encryption.enabled", Boolean.toString(encryption) + )); + final SegmentEncryptionMetadataV1 encryptionMetadata; + if (encryption) { + config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID); + config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID); + config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString()); + config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString()); + final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD(); + encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); + } else { + encryptionMetadata = null; + } + rsm.configure(config); + + final var segmentIndexBuilder = new SegmentIndexesV1Builder(); + final var bytes = "test".getBytes(); + final var is = rsm.transformIndex( + IndexType.OFFSET, + new ByteArrayInputStream(bytes), + bytes.length, + encryptionMetadata, + segmentIndexBuilder + ); + assertThat(is).isNotEmpty(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testTransformingEmptyIndexes(final boolean encryption) { + final var config = new HashMap<>(Map.of( + "chunk.size", "10", + "storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage", + "storage.root", targetDir.toString(), + "encryption.enabled", Boolean.toString(encryption) + )); + SegmentEncryptionMetadataV1 encryptionMetadata = null; + if (encryption) { + config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID); + config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID); + config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString()); + config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString()); + final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD(); + encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); + } + rsm.configure(config); + + final var segmentIndexBuilder = new SegmentIndexesV1Builder(); + final var is = rsm.transformIndex( + IndexType.OFFSET, + InputStream.nullInputStream(), + 0, + encryptionMetadata, + segmentIndexBuilder + ); + assertThat(is).isEmpty(); + } + + @Test + void testGetIndexSizeWithInvalidPaths() { + // non existing file + assertThatThrownBy(() -> RemoteStorageManager.indexSize(Path.of("non-exist"))) + .hasMessage("Error while getting index path size") + .isInstanceOf(RemoteStorageException.class); + } + @Test void testFetchingSegmentFileNonExistent() throws IOException { final var config = Map.of( diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 87a6c82b6..51961879f 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.SequenceInputStream; import java.nio.file.Files; +import java.nio.file.Path; import java.security.KeyPair; import java.util.ArrayList; import java.util.Collections; @@ -247,13 +248,13 @@ private SegmentIndexesV1 uploadIndexes( final LogSegmentData segmentData, final SegmentEncryptionMetadataV1 encryptionMeta, final SegmentCustomMetadataBuilder customMetadataBuilder - ) throws IOException, StorageBackendException { + ) throws IOException, RemoteStorageException, StorageBackendException { final List indexes = new ArrayList<>(IndexType.values().length); final SegmentIndexesV1Builder segmentIndexBuilder = new SegmentIndexesV1Builder(); final var offsetIndex = transformIndex( IndexType.OFFSET, Files.newInputStream(segmentData.offsetIndex()), - (int) Files.size(segmentData.offsetIndex()), + indexSize(segmentData.offsetIndex()), encryptionMeta, segmentIndexBuilder ); @@ -261,7 +262,7 @@ private SegmentIndexesV1 uploadIndexes( final var timeIndex = transformIndex( IndexType.TIMESTAMP, Files.newInputStream(segmentData.timeIndex()), - (int) Files.size(segmentData.timeIndex()), + indexSize(segmentData.timeIndex()), encryptionMeta, segmentIndexBuilder ); @@ -269,7 +270,7 @@ private SegmentIndexesV1 uploadIndexes( final var producerSnapshotIndex = transformIndex( IndexType.PRODUCER_SNAPSHOT, Files.newInputStream(segmentData.producerSnapshotIndex()), - (int) Files.size(segmentData.producerSnapshotIndex()), + indexSize(segmentData.producerSnapshotIndex()), encryptionMeta, segmentIndexBuilder ); @@ -277,7 +278,7 @@ private SegmentIndexesV1 uploadIndexes( final var leaderEpoch = transformIndex( IndexType.LEADER_EPOCH, new ByteBufferInputStream(segmentData.leaderEpochIndex()), - segmentData.leaderEpochIndex().capacity(), + segmentData.leaderEpochIndex().remaining(), encryptionMeta, segmentIndexBuilder ); @@ -286,7 +287,7 @@ private SegmentIndexesV1 uploadIndexes( final var transactionIndex = transformIndex( IndexType.TRANSACTION, Files.newInputStream(segmentData.transactionIndex().get()), - (int) Files.size(segmentData.transactionIndex().get()), + indexSize(segmentData.transactionIndex().get()), encryptionMeta, segmentIndexBuilder ); @@ -308,6 +309,21 @@ private SegmentIndexesV1 uploadIndexes( return segmentIndexBuilder.build(); } + static int indexSize(final Path indexPath) throws RemoteStorageException { + try { + final var size = Files.size(indexPath); + if (size > Integer.MAX_VALUE) { + throw new IllegalStateException( + "Index at path " + + indexPath + + " has size larger than Integer.MAX_VALUE"); + } + return (int) size; + } catch (final IOException e) { + throw new RemoteStorageException("Error while getting index path size", e); + } + } + private Optional buildCustomMetadata(final SegmentCustomMetadataBuilder customMetadataBuilder) { final var customFields = customMetadataBuilder.build(); if (!customFields.isEmpty()) { @@ -355,11 +371,15 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet } } - private InputStream transformIndex(final IndexType indexType, - final InputStream index, - final int size, - final SegmentEncryptionMetadata encryptionMetadata, - final SegmentIndexesV1Builder segmentIndexBuilder) { + InputStream transformIndex(final IndexType indexType, + final InputStream index, + final int size, + final SegmentEncryptionMetadata encryptionMetadata, + final SegmentIndexesV1Builder segmentIndexBuilder) { + log.debug("Transforming index {} with size {}", indexType, size); + if (size == 0) { + return InputStream.nullInputStream(); + } TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(index, size); if (encryptionEnabled) { final var dataKeyAndAAD = new DataKeyAndAAD(encryptionMetadata.dataKey(), encryptionMetadata.aad());