Skip to content

Commit

Permalink
KAFKA-7896; Add sasl.jaas.config/sasl.mechanism props to the log4j ka…
Browse files Browse the repository at this point in the history
…fka appender

This patch adds 2 props to the log4j kafka appender that get put directly
into the sasl properties passed to the producer:
    - ClientJaasConf: This property sets sasl.jaas.config
    - SaslMechanim: This property sets sasl.mechanism

Author: Rohan Desai <[email protected]>

Reviewers: Manikumar Reddy <[email protected]>

Closes #6216 from rodesai/add-kafka-appender-security-props
  • Loading branch information
rodesai authored and omkreddy committed Feb 12, 2019
1 parent d2575f0 commit 08036fa
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
Expand All @@ -63,7 +65,9 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
private String sslKeystoreLocation;
private String sslKeystorePassword;
private String saslKerberosServiceName;
private String saslMechanism;
private String clientJaasConfPath;
private String clientJaasConf;
private String kerb5ConfPath;
private Integer maxBlockMs;

Expand Down Expand Up @@ -210,6 +214,22 @@ public String getClientJaasConfPath() {
return clientJaasConfPath;
}

public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}

public String getSaslMechanism() {
return this.saslMechanism;
}

public void setClientJaasConf(final String clientJaasConf) {
this.clientJaasConf = clientJaasConf;
}

public String getClientJaasConf() {
return this.clientJaasConf;
}

public String getKerb5ConfPath() {
return kerb5ConfPath;
}
Expand Down Expand Up @@ -257,9 +277,15 @@ public void activateOptions() {
if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) {
props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName);
System.setProperty("java.security.auth.login.config", clientJaasConfPath);
if (kerb5ConfPath != null) {
System.setProperty("java.security.krb5.conf", kerb5ConfPath);
}
}
if (kerb5ConfPath != null) {
System.setProperty("java.security.krb5.conf", kerb5ConfPath);
}
if (saslMechanism != null) {
props.put(SASL_MECHANISM, saslMechanism);
}
if (clientJaasConf != null) {
props.put(SASL_JAAS_CONFIG, clientJaasConf);
}
if (maxBlockMs != null) {
props.put(MAX_BLOCK_MS_CONFIG, maxBlockMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
*/
package org.apache.kafka.log4jappender;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.not;

import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.helpers.LogLog;
Expand Down Expand Up @@ -77,6 +82,45 @@ public void testKafkaLog4jConfigs() {
}
}

@Test
public void testSetSaslMechanism() {
Properties props = getLog4jConfig(false);
props.put("log4j.appender.KAFKA.SaslMechanism", "PLAIN");
PropertyConfigurator.configure(props);

MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender();
Assert.assertThat(
mockKafkaLog4jAppender.getProducerProperties().getProperty(SaslConfigs.SASL_MECHANISM),
equalTo("PLAIN"));
}

@Test
public void testSaslMechanismNotSet() {
testProducerPropertyNotSet(SaslConfigs.SASL_MECHANISM);
}

@Test
public void testSetJaasConfig() {
Properties props = getLog4jConfig(false);
props.put("log4j.appender.KAFKA.ClientJaasConf", "jaas-config");
PropertyConfigurator.configure(props);

MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender();
Assert.assertThat(
mockKafkaLog4jAppender.getProducerProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG),
equalTo("jaas-config"));
}

@Test
public void testJaasConfigNotSet() {
testProducerPropertyNotSet(SaslConfigs.SASL_JAAS_CONFIG);
}

private void testProducerPropertyNotSet(String name) {
PropertyConfigurator.configure(getLog4jConfig(false));
MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender();
Assert.assertThat(mockKafkaLog4jAppender.getProducerProperties().stringPropertyNames(), not(hasItem(name)));
}

@Test
public void testLog4jAppends() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ public class MockKafkaLog4jAppender extends KafkaLog4jAppender {
private MockProducer<byte[], byte[]> mockProducer =
new MockProducer<>(false, new MockSerializer(), new MockSerializer());

private Properties producerProperties;

@Override
protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
producerProperties = props;
return mockProducer;
}

Expand All @@ -49,4 +52,8 @@ protected void append(LoggingEvent event) {
List<ProducerRecord<byte[], byte[]>> getHistory() {
return mockProducer.history();
}

public Properties getProducerProperties() {
return producerProperties;
}
}

0 comments on commit 08036fa

Please sign in to comment.