diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java index dc7e3c727..5a733bd6e 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java @@ -88,6 +88,12 @@ public class S3StorageConfig extends AbstractConfig { + "Use with caution and always only in a test environment, as disabling certificate lead the storage " + "to be vulnerable to man-in-the-middle attacks."; + public static final String AWS_CHECKSUM_CHECK_ENABLED_CONFIG = "aws.checksum.check.enabled"; + private static final String AWS_CHECKSUM_CHECK_ENABLED_DOC = + "This property is used to enable checksum validation done by AWS library. " + + "When set to \"false\", there will be no validation. " + + "It is disabled by default as Kafka already validates integrity of the files."; + private static final ConfigDef CONFIG; @@ -166,6 +172,12 @@ public class S3StorageConfig extends AbstractConfig { true, ConfigDef.Importance.LOW, AWS_CERTIFICATE_CHECK_ENABLED_DOC + ) + .define(AWS_CHECKSUM_CHECK_ENABLED_CONFIG, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.MEDIUM, + AWS_CHECKSUM_CHECK_ENABLED_DOC ); } @@ -218,6 +230,8 @@ S3Client s3Client() { ); } + s3ClientBuilder.serviceConfiguration(builder -> builder.checksumValidationEnabled(checksumCheckEnabled())); + final AwsCredentialsProvider credentialsProvider = credentialsProvider(); if (credentialsProvider != null) { s3ClientBuilder.credentialsProvider(credentialsProvider); @@ -267,6 +281,10 @@ public Boolean certificateCheckEnabled() { return getBoolean(AWS_CERTIFICATE_CHECK_ENABLED_CONFIG); } + public Boolean checksumCheckEnabled() { + return getBoolean(AWS_CHECKSUM_CHECK_ENABLED_CONFIG); + } + public String bucketName() { return getString(S3_BUCKET_NAME_CONFIG); } diff --git a/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfigTest.java b/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfigTest.java index b61e0f7ee..32df124f0 100644 --- a/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfigTest.java +++ b/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfigTest.java @@ -53,6 +53,7 @@ void minimalConfig() { assertThat(config.pathStyleAccessEnabled()).isNull(); assertThat(config.uploadPartSize()).isEqualTo(S3_MULTIPART_UPLOAD_PART_SIZE_DEFAULT); assertThat(config.certificateCheckEnabled()).isTrue(); + assertThat(config.checksumCheckEnabled()).isFalse(); verifyClientConfiguration(config.s3Client(), null); } @@ -110,7 +111,8 @@ void configWithStaticCredentials() { "s3.endpoint.url", MINIO_URL, "aws.access.key.id", username, "aws.secret.access.key", password, - "aws.certificate.check.enabled", "false"); + "aws.certificate.check.enabled", "false", + "aws.checksum.check.enabled", "true"); final var config = new S3StorageConfig(configs); @@ -120,6 +122,7 @@ void configWithStaticCredentials() { assertThat(config.getPassword("aws.access.key.id").value()).isEqualTo(username); assertThat(config.getPassword("aws.secret.access.key").value()).isEqualTo(password); assertThat(config.certificateCheckEnabled()).isFalse(); + assertThat(config.checksumCheckEnabled()).isTrue(); final AwsCredentialsProvider credentialsProvider = config.credentialsProvider(); assertThat(credentialsProvider).isInstanceOf(StaticCredentialsProvider.class);