Skip to content

Commit

Permalink
FIX: decouple msk auth from glue auth in KafkaSource (#4613)
Browse files Browse the repository at this point in the history
* FIX: decouple msk from aws block

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Jun 12, 2024
1 parent 3ef20b0 commit af7d1b5
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public class KafkaSecurityConfigurer {
private static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location";
private static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password";

private static AwsCredentialsProvider credentialsProvider;
private static AwsCredentialsProvider mskCredentialsProvider;
private static AwsCredentialsProvider awsGlueCredentialsProvider;
private static GlueSchemaRegistryKafkaDeserializer glueDeserializer;


Expand Down Expand Up @@ -207,6 +208,9 @@ public static void setAwsIamAuthProperties(Properties properties, final AwsIamAu
properties.put(SASL_MECHANISM, "AWS_MSK_IAM");
properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) {
if (Objects.isNull(awsConfig)) {
throw new RuntimeException("AWS Config needs to be specified when sasl/aws_msk_iam is set to \"role\"");
}
String baseIamAuthConfig = "software.amazon.msk.auth.iam.IAMLoginModule required " +
"awsRoleArn=\"%s\" " +
"awsStsRegion=\"%s\"";
Expand All @@ -225,14 +229,16 @@ public static void setAwsIamAuthProperties(Properties properties, final AwsIamAu
}
}

public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuthConfig, final AwsConfig awsConfig, final Logger LOG) {
if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) {
private static void configureMSKCredentialsProvider(final AuthConfig authConfig, final AwsConfig awsConfig) {
mskCredentialsProvider = DefaultCredentialsProvider.create();
if (Objects.nonNull(authConfig) && Objects.nonNull(authConfig.getSaslAuthConfig()) &&
authConfig.getSaslAuthConfig().getAwsIamAuthConfig() == AwsIamAuthConfig.ROLE) {
String sessionName = "data-prepper-kafka-session" + UUID.randomUUID();
StsClient stsClient = StsClient.builder()
.region(Region.of(awsConfig.getRegion()))
.credentialsProvider(credentialsProvider)
.credentialsProvider(mskCredentialsProvider)
.build();
credentialsProvider = StsAssumeRoleCredentialsProvider
mskCredentialsProvider = StsAssumeRoleCredentialsProvider
.builder()
.stsClient(stsClient)
.refreshRequest(
Expand All @@ -242,12 +248,15 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth
.roleSessionName(sessionName)
.build()
).build();
} else if (awsIamAuthConfig != AwsIamAuthConfig.DEFAULT) {
throw new RuntimeException("Unknown AWS IAM auth mode");
}
}

public static String getBootStrapServersForMsk(final AwsConfig awsConfig,
final AwsCredentialsProvider mskCredentialsProvider,
final Logger log) {
final AwsConfig.AwsMskConfig awsMskConfig = awsConfig.getAwsMskConfig();
KafkaClient kafkaClient = KafkaClient.builder()
.credentialsProvider(credentialsProvider)
.credentialsProvider(mskCredentialsProvider)
.region(Region.of(awsConfig.getRegion()))
.build();
final GetBootstrapBrokersRequest request =
Expand All @@ -264,7 +273,7 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth
try {
result = kafkaClient.getBootstrapBrokers(request);
} catch (KafkaException | StsException e) {
LOG.info("Failed to get bootstrap server information from MSK. Will try every 10 seconds for {} seconds", 10*MAX_KAFKA_CLIENT_RETRIES, e);
log.info("Failed to get bootstrap server information from MSK. Will try every 10 seconds for {} seconds", 10*MAX_KAFKA_CLIENT_RETRIES, e);
try {
Thread.sleep(10000);
} catch (InterruptedException exp) {}
Expand Down Expand Up @@ -302,16 +311,19 @@ public static void setDynamicSaslClientCallbackHandler(final Properties properti
}
}
}
public static void setAuthProperties(final Properties properties, final KafkaClusterAuthConfig kafkaClusterAuthConfig, final Logger LOG) {
public static void setAuthProperties(final Properties properties, final KafkaClusterAuthConfig kafkaClusterAuthConfig, final Logger log) {
final AwsConfig awsConfig = kafkaClusterAuthConfig.getAwsConfig();
final AuthConfig authConfig = kafkaClusterAuthConfig.getAuthConfig();
final EncryptionConfig encryptionConfig = kafkaClusterAuthConfig.getEncryptionConfig();
credentialsProvider = DefaultCredentialsProvider.create();
configureMSKCredentialsProvider(authConfig, awsConfig);

String bootstrapServers = "";
if (Objects.nonNull(kafkaClusterAuthConfig.getBootstrapServers())) {
bootstrapServers = String.join(",", kafkaClusterAuthConfig.getBootstrapServers());
}
if (Objects.nonNull(awsConfig) && Objects.nonNull(awsConfig.getAwsMskConfig())) {
bootstrapServers = getBootStrapServersForMsk(awsConfig, mskCredentialsProvider, log);
}

if (Objects.nonNull(authConfig)) {
final AuthConfig.SaslAuthConfig saslAuthConfig = authConfig.getSaslAuthConfig();
Expand All @@ -323,11 +335,7 @@ public static void setAuthProperties(final Properties properties, final KafkaClu
if (checkEncryptionType(encryptionConfig, EncryptionType.NONE)) {
throw new RuntimeException("Encryption Config must be SSL to use IAM authentication mechanism");
}
if (Objects.isNull(awsConfig)) {
throw new RuntimeException("AWS Config is not specified");
}
setAwsIamAuthProperties(properties, awsIamAuthConfig, awsConfig);
bootstrapServers = getBootStrapServersForMsk(awsIamAuthConfig, awsConfig, LOG);
} else if (Objects.nonNull(saslAuthConfig.getOAuthConfig())) {
setOauthProperties(kafkaClusterAuthConfig, properties);
} else if (Objects.nonNull(plainTextAuthConfig) && Objects.nonNull(kafkaClusterAuthConfig.getEncryptionConfig())) {
Expand Down Expand Up @@ -358,19 +366,45 @@ private static boolean checkEncryptionType(final EncryptionConfig encryptionConf
}

public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(final KafkaConsumerConfig kafkaConsumerConfig) {
configureAwsGlueCredentialsProvider(kafkaConsumerConfig.getAwsConfig());
SchemaConfig schemaConfig = kafkaConsumerConfig.getSchemaConfig();
if (Objects.isNull(schemaConfig) || schemaConfig.getType() != SchemaRegistryType.AWS_GLUE) {
return null;
}
Map<String, Object> configs = new HashMap<>();
configs.put(AWSSchemaRegistryConstants.AWS_REGION, kafkaConsumerConfig.getAwsConfig().getRegion());
final AwsConfig awsConfig = kafkaConsumerConfig.getAwsConfig();
if (Objects.nonNull(awsConfig) && Objects.nonNull(awsConfig.getRegion())) {
configs.put(AWSSchemaRegistryConstants.AWS_REGION, kafkaConsumerConfig.getAwsConfig().getRegion());
}
configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000");
configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10");
configs.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL);
glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(credentialsProvider, configs);
glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(awsGlueCredentialsProvider, configs);
return glueDeserializer;
}

private static void configureAwsGlueCredentialsProvider(final AwsConfig awsConfig) {
awsGlueCredentialsProvider = DefaultCredentialsProvider.create();
if (Objects.nonNull(awsConfig) &&
Objects.nonNull(awsConfig.getRegion()) && Objects.nonNull(awsConfig.getStsRoleArn())) {
String sessionName = "data-prepper-kafka-session" + UUID.randomUUID();
StsClient stsClient = StsClient.builder()
.region(Region.of(awsConfig.getRegion()))
.credentialsProvider(awsGlueCredentialsProvider)
.build();
awsGlueCredentialsProvider = StsAssumeRoleCredentialsProvider
.builder()
.stsClient(stsClient)
.refreshRequest(
AssumeRoleRequest
.builder()
.roleArn(awsConfig.getStsRoleArn())
.roleSessionName(sessionName)
.build()
).build();
}
}

}

Loading

0 comments on commit af7d1b5

Please sign in to comment.