Skip to content

Commit

Permalink
Extract security config from Kafka config
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-walkiewicz authored and kokosing committed Apr 14, 2021
1 parent 5c8ae0a commit 5435fc9
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,23 @@
import io.trino.plugin.kafka.security.SecurityProtocol;

import static io.airlift.configuration.ConditionalModule.installModuleIf;
import static io.airlift.configuration.ConfigBinder.configBinder;

public class KafkaClientsModule
extends AbstractConfigurationAwareModule
{
@Override
protected void setup(Binder binder)
{
configBinder(binder).bindConfig(KafkaSecurityConfig.class);
installClientModule(SecurityProtocol.PLAINTEXT, KafkaClientsModule::configurePlainText);
installClientModule(SecurityProtocol.SSL, KafkaClientsModule::configureSsl);
}

private void installClientModule(SecurityProtocol securityProtocol, Module module)
{
install(installModuleIf(
KafkaConfig.class,
KafkaSecurityConfig.class,
config -> config.getSecurityProtocol().equals(securityProtocol),
module));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;
import io.trino.plugin.kafka.schema.file.FileTableDescriptionSupplier;
import io.trino.plugin.kafka.security.SecurityProtocol;
import io.trino.spi.HostAddress;

import javax.validation.constraints.Min;
Expand All @@ -32,7 +31,6 @@
import java.util.stream.StreamSupport;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.kafka.security.SecurityProtocol.PLAINTEXT;

@DefunctConfig("kafka.connect-timeout")
public class KafkaConfig
Expand All @@ -46,7 +44,6 @@ public class KafkaConfig
private int messagesPerSplit = 100_000;
private boolean timestampUpperBoundPushDownEnabled;
private String tableDescriptionSupplier = FileTableDescriptionSupplier.NAME;
private SecurityProtocol securityProtocol = PLAINTEXT;

@Size(min = 1)
public Set<HostAddress> getNodes()
Expand Down Expand Up @@ -155,17 +152,4 @@ public KafkaConfig setTimestampUpperBoundPushDownEnabled(boolean timestampUpperB
this.timestampUpperBoundPushDownEnabled = timestampUpperBoundPushDownEnabled;
return this;
}

@Config("kafka.security-protocol")
@ConfigDescription("Security protocol used for Kafka connection")
public KafkaConfig setSecurityProtocol(SecurityProtocol securityProtocol)
{
this.securityProtocol = securityProtocol;
return this;
}

public SecurityProtocol getSecurityProtocol()
{
return securityProtocol;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.kafka;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.trino.plugin.kafka.security.SecurityProtocol;

import javax.validation.constraints.NotNull;

public class KafkaSecurityConfig
{
private SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;

@NotNull
public SecurityProtocol getSecurityProtocol()
{
return securityProtocol;
}

@Config("kafka.security-protocol")
@ConfigDescription("Kafka communication security protocol")
public KafkaSecurityConfig setSecurityProtocol(SecurityProtocol securityProtocol)
{
this.securityProtocol = securityProtocol;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ public class PlainTextKafkaAdminFactory
private final SecurityProtocol securityProtocol;

@Inject
public PlainTextKafkaAdminFactory(KafkaConfig kafkaConfig)
public PlainTextKafkaAdminFactory(
KafkaConfig kafkaConfig,
KafkaSecurityConfig securityConfig)
{
requireNonNull(kafkaConfig, "kafkaConfig is null");
requireNonNull(securityConfig, "securityConfig is null");

nodes = kafkaConfig.getNodes();
securityProtocol = kafkaConfig.getSecurityProtocol();
securityProtocol = securityConfig.getSecurityProtocol();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ public class PlainTextKafkaConsumerFactory
private final SecurityProtocol securityProtocol;

@Inject
public PlainTextKafkaConsumerFactory(KafkaConfig kafkaConfig)
public PlainTextKafkaConsumerFactory(
KafkaConfig kafkaConfig,
KafkaSecurityConfig securityConfig)
{
requireNonNull(kafkaConfig, "kafkaConfig is null");
requireNonNull(securityConfig, "securityConfig is null");

nodes = kafkaConfig.getNodes();
kafkaBufferSize = kafkaConfig.getKafkaBufferSize();
securityProtocol = kafkaConfig.getSecurityProtocol();
securityProtocol = securityConfig.getSecurityProtocol();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ public class PlainTextKafkaProducerFactory
private final SecurityProtocol securityProtocol;

@Inject
public PlainTextKafkaProducerFactory(KafkaConfig kafkaConfig)
public PlainTextKafkaProducerFactory(KafkaConfig kafkaConfig, KafkaSecurityConfig securityConfig)
{
requireNonNull(kafkaConfig, "kafkaConfig is null");
requireNonNull(securityConfig, "securityConfig is null");

nodes = kafkaConfig.getNodes();
securityProtocol = kafkaConfig.getSecurityProtocol();
securityProtocol = securityConfig.getSecurityProtocol();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.google.inject.Binder;
import com.google.inject.Module;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.kafka.KafkaConfig;
import io.trino.plugin.kafka.KafkaSecurityConfig;

import static io.airlift.configuration.ConditionalModule.installModuleIf;
import static io.airlift.configuration.ConfigurationModule.installModules;
Expand All @@ -35,7 +35,7 @@ protected void setup(Binder binder)
private void bindSecurityModule(SecurityProtocol securityProtocol, Module module)
{
install(installModuleIf(
KafkaConfig.class,
KafkaSecurityConfig.class,
config -> config.getSecurityProtocol().equals(securityProtocol),
module));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.kafka.schema.file.FileTableDescriptionSupplier;
import io.trino.plugin.kafka.security.SecurityProtocol;
import org.testng.annotations.Test;

import java.util.Map;
Expand All @@ -36,8 +35,7 @@ public void testDefaults()
.setTableDescriptionSupplier(FileTableDescriptionSupplier.NAME)
.setHideInternalColumns(true)
.setMessagesPerSplit(100_000)
.setTimestampUpperBoundPushDownEnabled(false)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT));
.setTimestampUpperBoundPushDownEnabled(false));
}

@Test
Expand All @@ -51,7 +49,6 @@ public void testExplicitPropertyMappings()
.put("kafka.hide-internal-columns", "false")
.put("kafka.messages-per-split", "1")
.put("kafka.timestamp-upper-bound-force-push-down-enabled", "true")
.put("kafka.security-protocol", "SSL")
.build();

KafkaConfig expected = new KafkaConfig()
Expand All @@ -61,8 +58,7 @@ public void testExplicitPropertyMappings()
.setKafkaBufferSize("1MB")
.setHideInternalColumns(false)
.setMessagesPerSplit(1)
.setTimestampUpperBoundPushDownEnabled(true)
.setSecurityProtocol(SecurityProtocol.SSL);
.setTimestampUpperBoundPushDownEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.kafka;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.kafka.security.SecurityProtocol;
import org.testng.annotations.Test;

import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;

public class TestKafkaSecurityConfig
{
@Test
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(KafkaSecurityConfig.class)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT));
}

@Test
public void testExplicitPropertyMappings()
{
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("kafka.security-protocol", "SSL")
.build();

KafkaSecurityConfig expected = new KafkaSecurityConfig()
.setSecurityProtocol(SecurityProtocol.SSL);

assertFullMapping(properties, expected);
}
}

0 comments on commit 5435fc9

Please sign in to comment.