From af7d1b54561a52ac13e7ba04d2b73a77cb0dc5cb Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Wed, 12 Jun 2024 14:46:08 -0500 Subject: [PATCH] FIX: decouple msk auth from glue auth in KafkaSource (#4613) * FIX: decouple msk from aws block Signed-off-by: George Chen --- .../kafka/util/KafkaSecurityConfigurer.java | 68 ++++++-- .../util/KafkaSecurityConfigurerTest.java | 145 ++++++++++++++++++ ...peline-bootstrap-servers-glue-default.yaml | 14 ++ ...ootstrap-servers-glue-sts-assume-role.yaml | 17 ++ ...ine-bootstrap-servers-override-by-msk.yaml | 20 +++ ...ne-bootstrap-servers-sasl-iam-default.yaml | 15 ++ ...eline-bootstrap-servers-sasl-iam-role.yaml | 18 +++ ...line-msk-default-glue-sts-assume-role.yaml | 20 +++ .../kafka-pipeline-msk-sasl-plain.yaml | 20 +++ 9 files changed, 320 insertions(+), 17 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-glue-default.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-glue-sts-assume-role.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-override-by-msk.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-sasl-iam-default.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-sasl-iam-role.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-msk-default-glue-sts-assume-role.yaml create mode 100644 data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-msk-sasl-plain.yaml diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java index a5e27e4d98..402f248ddf 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurer.java @@ -92,7 +92,8 @@ public class KafkaSecurityConfigurer { private static final String SSL_TRUSTSTORE_LOCATION = "ssl.truststore.location"; private static final String SSL_TRUSTSTORE_PASSWORD = "ssl.truststore.password"; - private static AwsCredentialsProvider credentialsProvider; + private static AwsCredentialsProvider mskCredentialsProvider; + private static AwsCredentialsProvider awsGlueCredentialsProvider; private static GlueSchemaRegistryKafkaDeserializer glueDeserializer; @@ -207,6 +208,9 @@ public static void setAwsIamAuthProperties(Properties properties, final AwsIamAu properties.put(SASL_MECHANISM, "AWS_MSK_IAM"); properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler"); if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) { + if (Objects.isNull(awsConfig)) { + throw new RuntimeException("AWS Config needs to be specified when sasl/aws_msk_iam is set to \"role\""); + } String baseIamAuthConfig = "software.amazon.msk.auth.iam.IAMLoginModule required " + "awsRoleArn=\"%s\" " + "awsStsRegion=\"%s\""; @@ -225,14 +229,16 @@ public static void setAwsIamAuthProperties(Properties properties, final AwsIamAu } } - public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuthConfig, final AwsConfig awsConfig, final Logger LOG) { - if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) { + private static void configureMSKCredentialsProvider(final AuthConfig authConfig, final AwsConfig awsConfig) { + mskCredentialsProvider = DefaultCredentialsProvider.create(); + if (Objects.nonNull(authConfig) && Objects.nonNull(authConfig.getSaslAuthConfig()) && + authConfig.getSaslAuthConfig().getAwsIamAuthConfig() == AwsIamAuthConfig.ROLE) { String sessionName = "data-prepper-kafka-session" + UUID.randomUUID(); StsClient stsClient = StsClient.builder() .region(Region.of(awsConfig.getRegion())) - .credentialsProvider(credentialsProvider) + .credentialsProvider(mskCredentialsProvider) .build(); - credentialsProvider = StsAssumeRoleCredentialsProvider + mskCredentialsProvider = StsAssumeRoleCredentialsProvider .builder() .stsClient(stsClient) .refreshRequest( @@ -242,12 +248,15 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth .roleSessionName(sessionName) .build() ).build(); - } else if (awsIamAuthConfig != AwsIamAuthConfig.DEFAULT) { - throw new RuntimeException("Unknown AWS IAM auth mode"); } + } + + public static String getBootStrapServersForMsk(final AwsConfig awsConfig, + final AwsCredentialsProvider mskCredentialsProvider, + final Logger log) { final AwsConfig.AwsMskConfig awsMskConfig = awsConfig.getAwsMskConfig(); KafkaClient kafkaClient = KafkaClient.builder() - .credentialsProvider(credentialsProvider) + .credentialsProvider(mskCredentialsProvider) .region(Region.of(awsConfig.getRegion())) .build(); final GetBootstrapBrokersRequest request = @@ -264,7 +273,7 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth try { result = kafkaClient.getBootstrapBrokers(request); } catch (KafkaException | StsException e) { - LOG.info("Failed to get bootstrap server information from MSK. Will try every 10 seconds for {} seconds", 10*MAX_KAFKA_CLIENT_RETRIES, e); + log.info("Failed to get bootstrap server information from MSK. Will try every 10 seconds for {} seconds", 10*MAX_KAFKA_CLIENT_RETRIES, e); try { Thread.sleep(10000); } catch (InterruptedException exp) {} @@ -302,16 +311,19 @@ public static void setDynamicSaslClientCallbackHandler(final Properties properti } } } - public static void setAuthProperties(final Properties properties, final KafkaClusterAuthConfig kafkaClusterAuthConfig, final Logger LOG) { + public static void setAuthProperties(final Properties properties, final KafkaClusterAuthConfig kafkaClusterAuthConfig, final Logger log) { final AwsConfig awsConfig = kafkaClusterAuthConfig.getAwsConfig(); final AuthConfig authConfig = kafkaClusterAuthConfig.getAuthConfig(); final EncryptionConfig encryptionConfig = kafkaClusterAuthConfig.getEncryptionConfig(); - credentialsProvider = DefaultCredentialsProvider.create(); + configureMSKCredentialsProvider(authConfig, awsConfig); String bootstrapServers = ""; if (Objects.nonNull(kafkaClusterAuthConfig.getBootstrapServers())) { bootstrapServers = String.join(",", kafkaClusterAuthConfig.getBootstrapServers()); } + if (Objects.nonNull(awsConfig) && Objects.nonNull(awsConfig.getAwsMskConfig())) { + bootstrapServers = getBootStrapServersForMsk(awsConfig, mskCredentialsProvider, log); + } if (Objects.nonNull(authConfig)) { final AuthConfig.SaslAuthConfig saslAuthConfig = authConfig.getSaslAuthConfig(); @@ -323,11 +335,7 @@ public static void setAuthProperties(final Properties properties, final KafkaClu if (checkEncryptionType(encryptionConfig, EncryptionType.NONE)) { throw new RuntimeException("Encryption Config must be SSL to use IAM authentication mechanism"); } - if (Objects.isNull(awsConfig)) { - throw new RuntimeException("AWS Config is not specified"); - } setAwsIamAuthProperties(properties, awsIamAuthConfig, awsConfig); - bootstrapServers = getBootStrapServersForMsk(awsIamAuthConfig, awsConfig, LOG); } else if (Objects.nonNull(saslAuthConfig.getOAuthConfig())) { setOauthProperties(kafkaClusterAuthConfig, properties); } else if (Objects.nonNull(plainTextAuthConfig) && Objects.nonNull(kafkaClusterAuthConfig.getEncryptionConfig())) { @@ -358,19 +366,45 @@ private static boolean checkEncryptionType(final EncryptionConfig encryptionConf } public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(final KafkaConsumerConfig kafkaConsumerConfig) { + configureAwsGlueCredentialsProvider(kafkaConsumerConfig.getAwsConfig()); SchemaConfig schemaConfig = kafkaConsumerConfig.getSchemaConfig(); if (Objects.isNull(schemaConfig) || schemaConfig.getType() != SchemaRegistryType.AWS_GLUE) { return null; } Map configs = new HashMap<>(); - configs.put(AWSSchemaRegistryConstants.AWS_REGION, kafkaConsumerConfig.getAwsConfig().getRegion()); + final AwsConfig awsConfig = kafkaConsumerConfig.getAwsConfig(); + if (Objects.nonNull(awsConfig) && Objects.nonNull(awsConfig.getRegion())) { + configs.put(AWSSchemaRegistryConstants.AWS_REGION, kafkaConsumerConfig.getAwsConfig().getRegion()); + } configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); configs.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); - glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(credentialsProvider, configs); + glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(awsGlueCredentialsProvider, configs); return glueDeserializer; } + private static void configureAwsGlueCredentialsProvider(final AwsConfig awsConfig) { + awsGlueCredentialsProvider = DefaultCredentialsProvider.create(); + if (Objects.nonNull(awsConfig) && + Objects.nonNull(awsConfig.getRegion()) && Objects.nonNull(awsConfig.getStsRoleArn())) { + String sessionName = "data-prepper-kafka-session" + UUID.randomUUID(); + StsClient stsClient = StsClient.builder() + .region(Region.of(awsConfig.getRegion())) + .credentialsProvider(awsGlueCredentialsProvider) + .build(); + awsGlueCredentialsProvider = StsAssumeRoleCredentialsProvider + .builder() + .stsClient(stsClient) + .refreshRequest( + AssumeRoleRequest + .builder() + .roleArn(awsConfig.getStsRoleArn()) + .roleSessionName(sessionName) + .build() + ).build(); + } + } + } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java index f1a9af8436..298457e21e 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSecurityConfigurerTest.java @@ -1,8 +1,11 @@ package org.opensearch.dataprepper.plugins.kafka.util; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; @@ -19,6 +22,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; +import software.amazon.awssdk.services.kafka.KafkaClient; +import software.amazon.awssdk.services.kafka.KafkaClientBuilder; +import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersRequest; +import software.amazon.awssdk.services.kafka.model.GetBootstrapBrokersResponse; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import java.io.FileReader; import java.io.IOException; @@ -27,12 +38,16 @@ import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.UUID; import static org.apache.kafka.common.config.SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.is; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; @@ -128,6 +143,136 @@ public void testSetAuthPropertiesAuthSslWithNoCertContentNoTrustStore() throws E assertThat(props.get("ssl.engine.factory.class"), is(nullValue())); } + @Test + public void testSetAuthPropertiesBootstrapServersWithSaslIAMRole() throws IOException { + final Properties props = new Properties(); + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-bootstrap-servers-sasl-iam-role.yaml"); + KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG); + assertThat(props.getProperty("bootstrap.servers"), is("localhost:9092")); + assertThat(props.getProperty("sasl.mechanism"), is("AWS_MSK_IAM")); + assertThat(props.getProperty("sasl.jaas.config"), + is("software.amazon.msk.auth.iam.IAMLoginModule required " + + "awsRoleArn=\"test_sasl_iam_sts_role\" awsStsRegion=\"us-east-2\";")); + assertThat(props.getProperty("security.protocol"), is("SASL_SSL")); + assertThat(props.getProperty("certificateContent"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.location"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.password"), is(nullValue())); + assertThat(props.get("ssl.engine.factory.class"), is(nullValue())); + assertThat(props.get("sasl.client.callback.handler.class"), + is("software.amazon.msk.auth.iam.IAMClientCallbackHandler")); + } + + @Test + public void testSetAuthPropertiesBootstrapServersWithSaslIAMDefault() throws IOException { + final Properties props = new Properties(); + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-bootstrap-servers-sasl-iam-default.yaml"); + KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG); + assertThat(props.getProperty("bootstrap.servers"), is("localhost:9092")); + assertThat(props.getProperty("sasl.jaas.config"), is("software.amazon.msk.auth.iam.IAMLoginModule required;")); + assertThat(props.getProperty("sasl.mechanism"), is("AWS_MSK_IAM")); + assertThat(props.getProperty("security.protocol"), is("SASL_SSL")); + assertThat(props.getProperty("certificateContent"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.location"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.password"), is(nullValue())); + assertThat(props.get("ssl.engine.factory.class"), is(nullValue())); + assertThat(props.get("sasl.client.callback.handler.class"), + is("software.amazon.msk.auth.iam.IAMClientCallbackHandler")); + } + + @Test + public void testSetAuthPropertiesBootstrapServersOverrideByMSK() throws IOException { + final String testMSKEndpoint = UUID.randomUUID().toString(); + final Properties props = new Properties(); + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-bootstrap-servers-override-by-msk.yaml"); + final KafkaClientBuilder kafkaClientBuilder = mock(KafkaClientBuilder.class); + final KafkaClient kafkaClient = mock(KafkaClient.class); + when(kafkaClientBuilder.credentialsProvider(any())).thenReturn(kafkaClientBuilder); + when(kafkaClientBuilder.region(any(Region.class))).thenReturn(kafkaClientBuilder); + when(kafkaClientBuilder.build()).thenReturn(kafkaClient); + final GetBootstrapBrokersResponse response = mock(GetBootstrapBrokersResponse.class); + when(response.bootstrapBrokerStringSaslIam()).thenReturn(testMSKEndpoint); + when(kafkaClient.getBootstrapBrokers(any(GetBootstrapBrokersRequest.class))).thenReturn(response); + try (MockedStatic mockedKafkaClient = mockStatic(KafkaClient.class)) { + mockedKafkaClient.when(KafkaClient::builder).thenReturn(kafkaClientBuilder); + KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG); + } + assertThat(props.getProperty("bootstrap.servers"), is(testMSKEndpoint)); + assertThat(props.getProperty("sasl.mechanism"), is("AWS_MSK_IAM")); + assertThat(props.getProperty("sasl.jaas.config"), + is("software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn=\"sts_role_arn\" awsStsRegion=\"us-east-2\";")); + assertThat(props.getProperty("security.protocol"), is("SASL_SSL")); + assertThat(props.getProperty("certificateContent"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.location"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.password"), is(nullValue())); + assertThat(props.get("ssl.engine.factory.class"), is(nullValue())); + assertThat(props.get("sasl.client.callback.handler.class"), + is("software.amazon.msk.auth.iam.IAMClientCallbackHandler")); + } + + @Test + public void testSetAuthPropertiesMskWithSaslPlain() throws IOException { + final String testMSKEndpoint = UUID.randomUUID().toString(); + final Properties props = new Properties(); + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig("kafka-pipeline-msk-sasl-plain.yaml"); + final KafkaClientBuilder kafkaClientBuilder = mock(KafkaClientBuilder.class); + final KafkaClient kafkaClient = mock(KafkaClient.class); + when(kafkaClientBuilder.credentialsProvider(any())).thenReturn(kafkaClientBuilder); + when(kafkaClientBuilder.region(any(Region.class))).thenReturn(kafkaClientBuilder); + when(kafkaClientBuilder.build()).thenReturn(kafkaClient); + final GetBootstrapBrokersResponse response = mock(GetBootstrapBrokersResponse.class); + when(response.bootstrapBrokerStringSaslIam()).thenReturn(testMSKEndpoint); + when(kafkaClient.getBootstrapBrokers(any(GetBootstrapBrokersRequest.class))).thenReturn(response); + try (MockedStatic mockedKafkaClient = mockStatic(KafkaClient.class)) { + mockedKafkaClient.when(KafkaClient::builder).thenReturn(kafkaClientBuilder); + KafkaSecurityConfigurer.setAuthProperties(props, kafkaSourceConfig, LOG); + } + assertThat(props.getProperty("bootstrap.servers"), is(testMSKEndpoint)); + assertThat(props.getProperty("sasl.mechanism"), is("PLAIN")); + assertThat(props.getProperty("sasl.jaas.config"), + is("org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"test_sasl_username\" password=\"test_sasl_password\";")); + assertThat(props.getProperty("security.protocol"), is("SASL_SSL")); + assertThat(props.getProperty("certificateContent"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.location"), is(nullValue())); + assertThat(props.getProperty("ssl.truststore.password"), is(nullValue())); + assertThat(props.get("ssl.engine.factory.class"), is(nullValue())); + } + + @ParameterizedTest + @ValueSource(strings = { + "kafka-pipeline-bootstrap-servers-glue-sts-assume-role.yaml", + "kafka-pipeline-msk-default-glue-sts-assume-role.yaml" + }) + void testGetGlueSerializerWithStsAssumeRoleCredentialsProvider(final String filename) throws IOException { + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig(filename); + final GlueSchemaRegistryKafkaDeserializer glueSchemaRegistryKafkaDeserializer = KafkaSecurityConfigurer + .getGlueSerializer(kafkaSourceConfig); + assertThat(glueSchemaRegistryKafkaDeserializer, notNullValue()); + assertThat(glueSchemaRegistryKafkaDeserializer.getCredentialProvider(), + instanceOf(StsAssumeRoleCredentialsProvider.class)); + } + + @Test + void testGetGlueSerializerWithDefaultCredentialsProvider() throws IOException { + final KafkaSourceConfig kafkaSourceConfig = createKafkaSinkConfig( + "kafka-pipeline-bootstrap-servers-glue-default.yaml"); + final DefaultAwsRegionProviderChain.Builder defaultAwsRegionProviderChainBuilder = mock( + DefaultAwsRegionProviderChain.Builder.class); + final DefaultAwsRegionProviderChain defaultAwsRegionProviderChain = mock(DefaultAwsRegionProviderChain.class); + when(defaultAwsRegionProviderChainBuilder.build()).thenReturn(defaultAwsRegionProviderChain); + when(defaultAwsRegionProviderChain.getRegion()).thenReturn(Region.US_EAST_1); + try (MockedStatic defaultAwsRegionProviderChainMockedStatic = + mockStatic(DefaultAwsRegionProviderChain.class)) { + defaultAwsRegionProviderChainMockedStatic.when(DefaultAwsRegionProviderChain::builder) + .thenReturn(defaultAwsRegionProviderChainBuilder); + final GlueSchemaRegistryKafkaDeserializer glueSchemaRegistryKafkaDeserializer = KafkaSecurityConfigurer + .getGlueSerializer(kafkaSourceConfig); + assertThat(glueSchemaRegistryKafkaDeserializer, notNullValue()); + assertThat(glueSchemaRegistryKafkaDeserializer.getCredentialProvider(), + instanceOf(DefaultCredentialsProvider.class)); + } + } + @Test void testSetDynamicSaslClientCallbackHandlerWithNonNullPlainTextAuthConfig() { when(kafkaConnectionConfig.getAuthConfig()).thenReturn(authConfig); diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-glue-default.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-glue-default.yaml new file mode 100644 index 0000000000..5017333415 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-glue-default.yaml @@ -0,0 +1,14 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "SSL" + schema: + type: aws_glue + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-glue-sts-assume-role.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-glue-sts-assume-role.yaml new file mode 100644 index 0000000000..4fc036a9de --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-glue-sts-assume-role.yaml @@ -0,0 +1,17 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "SSL" + aws: + region: us-east-2 + sts_role_arn: sts_role_arn + schema: + type: aws_glue + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-override-by-msk.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-override-by-msk.yaml new file mode 100644 index 0000000000..889fd0c044 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-override-by-msk.yaml @@ -0,0 +1,20 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "SSL" + authentication: + sasl: + aws_msk_iam: role + aws: + region: us-east-2 + sts_role_arn: sts_role_arn + msk: + arn: service Arn + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-sasl-iam-default.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-sasl-iam-default.yaml new file mode 100644 index 0000000000..0edc808ce3 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-sasl-iam-default.yaml @@ -0,0 +1,15 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "SSL" + authentication: + sasl: + aws_msk_iam: default + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-sasl-iam-role.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-sasl-iam-role.yaml new file mode 100644 index 0000000000..a4ef7fd94b --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-bootstrap-servers-sasl-iam-role.yaml @@ -0,0 +1,18 @@ +log-pipeline : + source: + kafka: + bootstrap_servers: + - "localhost:9092" + encryption: + type: "SSL" + authentication: + sasl: + aws_msk_iam: role + aws: + region: us-east-2 + sts_role_arn: test_sasl_iam_sts_role + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-msk-default-glue-sts-assume-role.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-msk-default-glue-sts-assume-role.yaml new file mode 100644 index 0000000000..bf94287f26 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-msk-default-glue-sts-assume-role.yaml @@ -0,0 +1,20 @@ +log-pipeline : + source: + kafka: + encryption: + type: "SSL" + authentication: + sasl: + aws_msk_iam: default + aws: + region: us-east-2 + sts_role_arn: sts_role_arn + msk: + arn: service Arn + schema: + type: aws_glue + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-msk-sasl-plain.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-msk-sasl-plain.yaml new file mode 100644 index 0000000000..f1a44ff414 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/kafka-pipeline-msk-sasl-plain.yaml @@ -0,0 +1,20 @@ +log-pipeline : + source: + kafka: + encryption: + type: "SSL" + authentication: + sasl: + plain: + username: test_sasl_username + password: test_sasl_password + aws: + region: us-east-2 + sts_role_arn: sts_role_arn + msk: + arn: service Arn + topics: + - name: "quickstart-events" + group_id: "groupdID1" + sink: + stdout: \ No newline at end of file