Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: validate index size when transforming #422

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if statements based on parameterized tests arguments is no the best approach and raise a question about overall feasibility of the tests being parameterized but since we had it here earlier let's address it later altogether.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -55,6 +56,8 @@

import io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache;
import io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule;
import io.aiven.kafka.tieredstorage.manifest.serde.KafkaTypeSerdeModule;
Expand All @@ -79,6 +82,7 @@
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -465,6 +469,79 @@ void testRequiresCompression(final CompressionType compressionType, final boolea
assertThat(requires).isEqualTo(expectedResult);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testTransformingIndexes(final boolean encryption) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say these tests should not be in this class but somewhere in /test directory since these are plain unit tests.

final var config = new HashMap<>(Map.of(
"chunk.size", "10",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString(),
"encryption.enabled", Boolean.toString(encryption)
));
final SegmentEncryptionMetadataV1 encryptionMetadata;
if (encryption) {
config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID);
config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID);
config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString());
config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString());
final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD();
encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
} else {
encryptionMetadata = null;
}
rsm.configure(config);

final var segmentIndexBuilder = new SegmentIndexesV1Builder();
final var bytes = "test".getBytes();
final var is = rsm.transformIndex(
IndexType.OFFSET,
new ByteArrayInputStream(bytes),
bytes.length,
encryptionMetadata,
segmentIndexBuilder
);
assertThat(is).isNotEmpty();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testTransformingEmptyIndexes(final boolean encryption) {
final var config = new HashMap<>(Map.of(
"chunk.size", "10",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString(),
"encryption.enabled", Boolean.toString(encryption)
));
SegmentEncryptionMetadataV1 encryptionMetadata = null;
if (encryption) {
config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID);
config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID);
config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString());
config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString());
final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD();
encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
}
rsm.configure(config);

final var segmentIndexBuilder = new SegmentIndexesV1Builder();
final var is = rsm.transformIndex(
IndexType.OFFSET,
InputStream.nullInputStream(),
0,
encryptionMetadata,
segmentIndexBuilder
);
assertThat(is).isEmpty();
}

@Test
void testGetIndexSizeWithInvalidPaths() {
// non existing file
assertThatThrownBy(() -> RemoteStorageManager.indexSize(Path.of("non-exist")))
.hasMessage("Error while getting index path size")
.isInstanceOf(RemoteStorageException.class);
}

@Test
void testFetchingSegmentFileNonExistent() throws IOException {
final var config = Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyPair;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -247,37 +248,37 @@ private SegmentIndexesV1 uploadIndexes(
final LogSegmentData segmentData,
final SegmentEncryptionMetadataV1 encryptionMeta,
final SegmentCustomMetadataBuilder customMetadataBuilder
) throws IOException, StorageBackendException {
) throws IOException, RemoteStorageException, StorageBackendException {
final List<InputStream> indexes = new ArrayList<>(IndexType.values().length);
final SegmentIndexesV1Builder segmentIndexBuilder = new SegmentIndexesV1Builder();
final var offsetIndex = transformIndex(
IndexType.OFFSET,
Files.newInputStream(segmentData.offsetIndex()),
(int) Files.size(segmentData.offsetIndex()),
indexSize(segmentData.offsetIndex()),
encryptionMeta,
segmentIndexBuilder
);
indexes.add(offsetIndex);
final var timeIndex = transformIndex(
IndexType.TIMESTAMP,
Files.newInputStream(segmentData.timeIndex()),
(int) Files.size(segmentData.timeIndex()),
indexSize(segmentData.timeIndex()),
encryptionMeta,
segmentIndexBuilder
);
indexes.add(timeIndex);
final var producerSnapshotIndex = transformIndex(
IndexType.PRODUCER_SNAPSHOT,
Files.newInputStream(segmentData.producerSnapshotIndex()),
(int) Files.size(segmentData.producerSnapshotIndex()),
indexSize(segmentData.producerSnapshotIndex()),
encryptionMeta,
segmentIndexBuilder
);
indexes.add(producerSnapshotIndex);
final var leaderEpoch = transformIndex(
IndexType.LEADER_EPOCH,
new ByteBufferInputStream(segmentData.leaderEpochIndex()),
segmentData.leaderEpochIndex().capacity(),
segmentData.leaderEpochIndex().remaining(),
encryptionMeta,
segmentIndexBuilder
);
Expand All @@ -286,7 +287,7 @@ private SegmentIndexesV1 uploadIndexes(
final var transactionIndex = transformIndex(
IndexType.TRANSACTION,
Files.newInputStream(segmentData.transactionIndex().get()),
(int) Files.size(segmentData.transactionIndex().get()),
indexSize(segmentData.transactionIndex().get()),
encryptionMeta,
segmentIndexBuilder
);
Expand All @@ -308,6 +309,21 @@ private SegmentIndexesV1 uploadIndexes(
return segmentIndexBuilder.build();
}

static int indexSize(final Path indexPath) throws RemoteStorageException {
try {
final var size = Files.size(indexPath);
if (size > Integer.MAX_VALUE) {
throw new IllegalStateException(
"Index at path "
+ indexPath
+ " has size larger than Integer.MAX_VALUE");
}
return (int) size;
} catch (final IOException e) {
throw new RemoteStorageException("Error while getting index path size", e);
}
}

private Optional<CustomMetadata> buildCustomMetadata(final SegmentCustomMetadataBuilder customMetadataBuilder) {
final var customFields = customMetadataBuilder.build();
if (!customFields.isEmpty()) {
Expand Down Expand Up @@ -355,11 +371,15 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
}
}

private InputStream transformIndex(final IndexType indexType,
final InputStream index,
final int size,
final SegmentEncryptionMetadata encryptionMetadata,
final SegmentIndexesV1Builder segmentIndexBuilder) {
InputStream transformIndex(final IndexType indexType,
final InputStream index,
final int size,
final SegmentEncryptionMetadata encryptionMetadata,
final SegmentIndexesV1Builder segmentIndexBuilder) {
log.debug("Transforming index {} with size {}", indexType, size);
if (size == 0) {
return InputStream.nullInputStream();
}
TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(index, size);
if (encryptionEnabled) {
final var dataKeyAndAAD = new DataKeyAndAAD(encryptionMetadata.dataKey(), encryptionMetadata.aad());
Expand Down