Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka source: support SASL/SCRAM mechanisms #4241

Closed
franky-m opened this issue Mar 6, 2024 · 6 comments · Fixed by #4912
Closed

Kafka source: support SASL/SCRAM mechanisms #4241

franky-m opened this issue Mar 6, 2024 · 6 comments · Fixed by #4912
Assignees
Labels
plugin - source A plugin to receive data from a service or location.
Milestone

Comments

@franky-m
Copy link
Contributor

franky-m commented Mar 6, 2024

Is your feature request related to a problem? Please describe.
Currently, Kafka source only supports SASL/PLAIN authentication mechanism, but apparently no SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512.

Describe the solution you'd like

Extend Data Prepper’s authentication options to include mechanisms such as SCRAM-SHA-512.

Example:

pipeline:
  name: kafka-pipeline
  source:
    kafka:
      bootstrap_servers:
        - 127.0.0.1:9093
      topics:
        - name: topic1
          group_id: groupID1
      authentication:
        sasl:
          plaintext:
            mechanism: SCRAM-SHA-512 # or SCRAM-SHA-256 or plain
            username: your_kafka_username
            password: your_kafka_password

Additional context

Many Kafka deployments rely on SCRAM mechanisms for improved security. Users who require SCRAM-SHA-512 authentication need this feature to seamlessly integrate Data Prepper into their existing Kafka infrastructure.

@dlvenable dlvenable added plugin - source A plugin to receive data from a service or location. and removed untriaged labels Mar 12, 2024
@dlvenable
Copy link
Member

@franky-m , This is a great idea. Would you be able to contribute a PR to help with this? We could give you some pointers in the code.

@franky-m
Copy link
Contributor Author

Hi could you give me the pointers you mentioned? I would try to implement the SASL/SCRAM support myself and if I succeed I would open a PR

@burandobata
Copy link

+1

@franky-m
Copy link
Contributor Author

franky-m commented Aug 5, 2024

Hi @dlvenable! Did you have time to collect the pointers you mentioned?

@dlvenable
Copy link
Member

@franky-m ,

Yes, I have some references.

First, I think that SASL/SCRAM is different from SASL/PLAIN. So the configuration should probably be a little different in the YAML.

It should have the following structure instead.

authentication:
  sasl:
    scram:
      username: your_kafka_username
      password: your_kafka_password

You can see where we add the current configuraiton in this block:

public static class SaslAuthConfig {
@JsonAlias("plain")
@JsonProperty("plaintext")
private PlainTextAuthConfig plainTextAuthConfig;

You can add something like the following below there to add the scram option.

        @JsonProperty("scram")
        private ScramAuthConfig scramAuthConfig;

Here is code where we set the plain configuration into the Kafka properties:

private static void setPlainTextAuthProperties(final Properties properties, final PlainTextAuthConfig plainTextAuthConfig,
final EncryptionConfig encryptionConfig) {
final String username = plainTextAuthConfig.getUsername();
final String password = plainTextAuthConfig.getPassword();
properties.put(SASL_MECHANISM, "PLAIN");
properties.put(SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
if (checkEncryptionType(encryptionConfig, EncryptionType.SSL)) {
properties.put(SECURITY_PROTOCOL, "SASL_SSL");
setSecurityProtocolSSLProperties(properties, encryptionConfig);
} else { // EncryptionType.NONE
properties.put(SECURITY_PROTOCOL, "SASL_PLAINTEXT");
}
}

And this is the code where we call it.

} else if (Objects.nonNull(plainTextAuthConfig) && Objects.nonNull(kafkaClusterAuthConfig.getEncryptionConfig())) {
setPlainTextAuthProperties(properties, plainTextAuthConfig, kafkaClusterAuthConfig.getEncryptionConfig());

You could add a new condition that would look somewhat like:

...
} else if(Objects.nonNull(saslAuthConfig.getScramAuthConfig())) {
  setScramAuthProperties(properties, saslAuthConfig.getScramAuthConfig());  // new method; maybe it needs the encryption config too
} else ...

@dlvenable
Copy link
Member

@franky-m , We did make some changes to support dynamically updating the password if it changes in the underlying source (e.g. AWS Secrets Manager). It isn't necessary to have that implemented, but would be nice.

@chenqi0805 , Can you provide any guidance on how that would be implemented?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
plugin - source A plugin to receive data from a service or location.
Projects
Development

Successfully merging a pull request may close this issue.

3 participants