Skip to content

Commit

Permalink
feat: add checksum check flag
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Sep 17, 2023
1 parent 5f64f5a commit ea1c61f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public class S3StorageConfig extends AbstractConfig {
+ "Use with caution and always only in a test environment, as disabling certificate lead the storage "
+ "to be vulnerable to man-in-the-middle attacks.";

public static final String AWS_CHECKSUM_CHECK_ENABLED_CONFIG = "aws.checksum.check.enabled";
private static final String AWS_CHECKSUM_CHECK_ENABLED_DOC =
"This property is used to enable checksum validation done by AWS library. " +
"When set to \"false\", there will be no validation. " +
"It is disabled by default as Kafka already validates integrity of the files.";


private static final ConfigDef CONFIG;

Expand Down Expand Up @@ -166,6 +172,12 @@ public class S3StorageConfig extends AbstractConfig {
true,
ConfigDef.Importance.LOW,
AWS_CERTIFICATE_CHECK_ENABLED_DOC
)
.define(AWS_CHECKSUM_CHECK_ENABLED_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.MEDIUM,
AWS_CHECKSUM_CHECK_ENABLED_DOC
);
}

Expand Down Expand Up @@ -218,6 +230,8 @@ S3Client s3Client() {
);
}

s3ClientBuilder.serviceConfiguration(builder -> builder.checksumValidationEnabled(checksumCheckEnabled()));

final AwsCredentialsProvider credentialsProvider = credentialsProvider();
if (credentialsProvider != null) {
s3ClientBuilder.credentialsProvider(credentialsProvider);
Expand Down Expand Up @@ -267,6 +281,10 @@ public Boolean certificateCheckEnabled() {
return getBoolean(AWS_CERTIFICATE_CHECK_ENABLED_CONFIG);
}

public Boolean checksumCheckEnabled() {
return getBoolean(AWS_CHECKSUM_CHECK_ENABLED_CONFIG);
}

public String bucketName() {
return getString(S3_BUCKET_NAME_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ void minimalConfig() {
assertThat(config.pathStyleAccessEnabled()).isNull();
assertThat(config.uploadPartSize()).isEqualTo(S3_MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
assertThat(config.certificateCheckEnabled()).isTrue();
assertThat(config.checksumCheckEnabled()).isFalse();
verifyClientConfiguration(config.s3Client(), null);
}

Expand Down Expand Up @@ -110,7 +111,8 @@ void configWithStaticCredentials() {
"s3.endpoint.url", MINIO_URL,
"aws.access.key.id", username,
"aws.secret.access.key", password,
"aws.certificate.check.enabled", "false");
"aws.certificate.check.enabled", "false",
"aws.checksum.check.enabled", "true");

final var config = new S3StorageConfig(configs);

Expand All @@ -120,6 +122,7 @@ void configWithStaticCredentials() {
assertThat(config.getPassword("aws.access.key.id").value()).isEqualTo(username);
assertThat(config.getPassword("aws.secret.access.key").value()).isEqualTo(password);
assertThat(config.certificateCheckEnabled()).isFalse();
assertThat(config.checksumCheckEnabled()).isTrue();

final AwsCredentialsProvider credentialsProvider = config.credentialsProvider();
assertThat(credentialsProvider).isInstanceOf(StaticCredentialsProvider.class);
Expand Down

0 comments on commit ea1c61f

Please sign in to comment.