Skip to content

Commit

Permalink
feat: validate index size when transforming
Browse files Browse the repository at this point in the history
Index size can potentially be zero or too large for transformation to work.
  • Loading branch information
jeqo committed Oct 20, 2023
1 parent a67b4d2 commit 3b4bf26
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -55,6 +56,8 @@

import io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache;
import io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule;
import io.aiven.kafka.tieredstorage.manifest.serde.KafkaTypeSerdeModule;
Expand All @@ -79,6 +82,7 @@
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -465,6 +469,79 @@ void testRequiresCompression(final CompressionType compressionType, final boolea
assertThat(requires).isEqualTo(expectedResult);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testTransformingIndexes(final boolean encryption) {
final var config = new HashMap<>(Map.of(
"chunk.size", "10",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString(),
"encryption.enabled", Boolean.toString(encryption)
));
final SegmentEncryptionMetadataV1 encryptionMetadata;
if (encryption) {
config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID);
config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID);
config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString());
config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString());
final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD();
encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
} else {
encryptionMetadata = null;
}
rsm.configure(config);

final var segmentIndexBuilder = new SegmentIndexesV1Builder();
final var bytes = "test".getBytes();
assertThat(rsm.transformIndex(
IndexType.OFFSET,
new ByteArrayInputStream(bytes),
bytes.length,
encryptionMetadata,
segmentIndexBuilder
)).isNotEmpty();
}


@ParameterizedTest
@ValueSource(booleans = {true, false})
void testTransformingEmptyIndexes(final boolean encryption) {
final var config = new HashMap<>(Map.of(
"chunk.size", "10",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString(),
"encryption.enabled", Boolean.toString(encryption)
));
SegmentEncryptionMetadataV1 encryptionMetadata = null;
if (encryption) {
config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID);
config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID);
config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString());
config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString());
final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD();
encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
}
rsm.configure(config);

final var segmentIndexBuilder = new SegmentIndexesV1Builder();
final var is = rsm.transformIndex(
IndexType.OFFSET,
InputStream.nullInputStream(),
0,
encryptionMetadata,
segmentIndexBuilder
);
assertThat(is).isEmpty();
}

@Test
void testGetIndexSizeWithInvalidPaths() {
// non existing file
assertThatThrownBy(() -> RemoteStorageManager.indexSize(Path.of("non-exist")))
.hasMessage("Error while getting index path size")
.isInstanceOf(RemoteStorageException.class);
}

@Test
void testFetchingSegmentFileNonExistent() throws IOException {
final var config = Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyPair;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -247,37 +248,37 @@ private SegmentIndexesV1 uploadIndexes(
final LogSegmentData segmentData,
final SegmentEncryptionMetadataV1 encryptionMeta,
final SegmentCustomMetadataBuilder customMetadataBuilder
) throws IOException, StorageBackendException {
) throws IOException, RemoteStorageException, StorageBackendException {
final List<InputStream> indexes = new ArrayList<>(IndexType.values().length);
final SegmentIndexesV1Builder segmentIndexBuilder = new SegmentIndexesV1Builder();
final var offsetIndex = transformIndex(
IndexType.OFFSET,
Files.newInputStream(segmentData.offsetIndex()),
(int) Files.size(segmentData.offsetIndex()),
indexSize(segmentData.offsetIndex()),
encryptionMeta,
segmentIndexBuilder
);
indexes.add(offsetIndex);
final var timeIndex = transformIndex(
IndexType.TIMESTAMP,
Files.newInputStream(segmentData.timeIndex()),
(int) Files.size(segmentData.timeIndex()),
indexSize(segmentData.timeIndex()),
encryptionMeta,
segmentIndexBuilder
);
indexes.add(timeIndex);
final var producerSnapshotIndex = transformIndex(
IndexType.PRODUCER_SNAPSHOT,
Files.newInputStream(segmentData.producerSnapshotIndex()),
(int) Files.size(segmentData.producerSnapshotIndex()),
indexSize(segmentData.producerSnapshotIndex()),
encryptionMeta,
segmentIndexBuilder
);
indexes.add(producerSnapshotIndex);
final var leaderEpoch = transformIndex(
IndexType.LEADER_EPOCH,
new ByteBufferInputStream(segmentData.leaderEpochIndex()),
segmentData.leaderEpochIndex().capacity(),
segmentData.leaderEpochIndex().remaining(),
encryptionMeta,
segmentIndexBuilder
);
Expand All @@ -286,7 +287,7 @@ private SegmentIndexesV1 uploadIndexes(
final var transactionIndex = transformIndex(
IndexType.TRANSACTION,
Files.newInputStream(segmentData.transactionIndex().get()),
(int) Files.size(segmentData.transactionIndex().get()),
indexSize(segmentData.transactionIndex().get()),
encryptionMeta,
segmentIndexBuilder
);
Expand All @@ -308,6 +309,21 @@ private SegmentIndexesV1 uploadIndexes(
return segmentIndexBuilder.build();
}

static int indexSize(final Path indexPath) throws RemoteStorageException {
try {
final var size = Files.size(indexPath);
if (size > Integer.MAX_VALUE) {
throw new IllegalStateException(
"Index at path "
+ indexPath
+ " has size larger than Integer.MAX_VALUE");
}
return (int) size;
} catch (final IOException e) {
throw new RemoteStorageException("Error while getting index path size", e);
}
}

private Optional<CustomMetadata> buildCustomMetadata(final SegmentCustomMetadataBuilder customMetadataBuilder) {
final var customFields = customMetadataBuilder.build();
if (!customFields.isEmpty()) {
Expand Down Expand Up @@ -355,11 +371,15 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
}
}

private InputStream transformIndex(final IndexType indexType,
final InputStream index,
final int size,
final SegmentEncryptionMetadata encryptionMetadata,
final SegmentIndexesV1Builder segmentIndexBuilder) {
InputStream transformIndex(final IndexType indexType,
final InputStream index,
final int size,
final SegmentEncryptionMetadata encryptionMetadata,
final SegmentIndexesV1Builder segmentIndexBuilder) {
log.debug("Transforming index {} with size {}", indexType, size);
if (size == 0) {
return InputStream.nullInputStream();
}
TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(index, size);
if (encryptionEnabled) {
final var dataKeyAndAAD = new DataKeyAndAAD(encryptionMetadata.dataKey(), encryptionMetadata.aad());
Expand Down

0 comments on commit 3b4bf26

Please sign in to comment.