Skip to content

Commit

Permalink
Prevent configuration of QuarkusKafkaClientFactory if quarkus-kuberne…
Browse files Browse the repository at this point in the history
…tes-service-binding is not on the classpath

Fixes #2901
  • Loading branch information
jamesnetherton authored and ppalaga committed Jul 20, 2021
1 parent fa7a4e5 commit d6a9091
Show file tree
Hide file tree
Showing 19 changed files with 439 additions and 39 deletions.
18 changes: 18 additions & 0 deletions docs/modules/ROOT/pages/reference/extensions/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,21 @@ Or add the coordinates to your existing project:
----

Check the xref:user-guide/index.adoc[User guide] for more information about writing Camel Quarkus applications.

== Additional Camel Quarkus configuration

[width="100%",cols="80,5,15",options="header"]
|===
| Configuration property | Type | Default


| [[quarkus.camel.kafka.kubernetes-service-binding.merge-configuration]]`link:#quarkus.camel.kafka.kubernetes-service-binding.merge-configuration[quarkus.camel.kafka.kubernetes-service-binding.merge-configuration]`

If `true` then any Kafka configuration properties discovered by the Quarkus Kubernetes Service Binding extension (if configured) will be merged with those set via Camel Kafka component or endpoint options. If `false` then any Kafka configuration properties discovered by the Quarkus Kubernetes Service Binding extension are ignored, and all of the Kafka component configuration is driven by Camel.
| `boolean`
| `true`
|===

[.configuration-legend]
icon:lock[title=Fixed at build time] Configuration property fixed at build time. All other configuration properties are overridable at runtime.

6 changes: 6 additions & 0 deletions extensions/kafka/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-kafka</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@
*/
package org.apache.camel.quarkus.component.kafka.deployment;

import java.util.List;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem;
import org.apache.camel.component.kafka.KafkaClientFactory;
import org.apache.camel.quarkus.component.kafka.CamelKafkaRecorder;
import org.apache.camel.quarkus.core.deployment.spi.CamelRuntimeBeanBuildItem;
import org.apache.camel.quarkus.component.kafka.KafkaClientFactoryProducer;
import org.apache.kafka.common.security.scram.internals.ScramSaslClient.ScramSaslClientFactory;

class KafkaProcessor {
Expand All @@ -39,15 +35,12 @@ FeatureBuildItem feature() {
}

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
CamelRuntimeBeanBuildItem createKafkaClientFactory(
CamelKafkaRecorder recorder,
// We want Quarkus to configure the ServiceBindingConverter bits before this step
List<ServiceProviderBuildItem> serviceProviders) {
return new CamelRuntimeBeanBuildItem(
"quarkusKafkaClientFactory",
KafkaClientFactory.class.getName(),
recorder.createKafkaClientFactory());
void createKafkaClientFactoryProducerBean(
Capabilities capabilities,
BuildProducer<AdditionalBeanBuildItem> additionalBean) {
if (capabilities.isPresent(Capability.KUBERNETES_SERVICE_BINDING)) {
additionalBean.produce(AdditionalBeanBuildItem.unremovableOf(KafkaClientFactoryProducer.class));
}
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka.deployment;

import java.util.Arrays;
import java.util.Set;

import javax.inject.Inject;

import io.quarkus.bootstrap.model.AppArtifact;
import io.quarkus.builder.Version;
import io.quarkus.test.QuarkusUnitTest;
import org.apache.camel.CamelContext;
import org.apache.camel.component.kafka.KafkaClientFactory;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import static org.junit.jupiter.api.Assertions.assertTrue;

public class QuarkusKafkaClientFactoryDisabledMergeConfigTest {

@RegisterExtension
static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
.setForcedDependencies(Arrays.asList(
new AppArtifact("io.quarkus", "quarkus-kubernetes-service-binding", Version.getVersion())))
.withConfigurationResource("application-configuration-merging-disabled.properties")
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));

@Inject
CamelContext context;

@Test
public void quarkusKafkaClientFactoryRegistryBeanNull() {
Set<KafkaClientFactory> factories = context.getRegistry().findByType(KafkaClientFactory.class);
assertTrue(factories.isEmpty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka.deployment;

import java.util.Set;

import javax.inject.Inject;

import io.quarkus.test.QuarkusUnitTest;
import org.apache.camel.CamelContext;
import org.apache.camel.component.kafka.KafkaClientFactory;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import static org.junit.jupiter.api.Assertions.assertTrue;

public class QuarkusKafkaClientFactoryDisabledTest {

@RegisterExtension
static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
.withConfigurationResource("application.properties")
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));

@Inject
CamelContext context;

@Test
public void quarkusKafkaClientFactoryRegistryBeanNull() {
Set<KafkaClientFactory> factories = context.getRegistry().findByType(KafkaClientFactory.class);
assertTrue(factories.isEmpty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka.deployment;

import java.util.Arrays;
import java.util.Set;

import javax.inject.Inject;

import io.quarkus.bootstrap.model.AppArtifact;
import io.quarkus.builder.Version;
import io.quarkus.test.QuarkusUnitTest;
import org.apache.camel.CamelContext;
import org.apache.camel.component.kafka.KafkaClientFactory;
import org.apache.camel.quarkus.component.kafka.QuarkusKafkaClientFactory;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class QuarkusKafkaClientFactoryEnabledTest {

@RegisterExtension
static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
.withConfigurationResource("application.properties")
.setForcedDependencies(Arrays.asList(
new AppArtifact("io.quarkus", "quarkus-kubernetes-service-binding", Version.getVersion())))
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));

@Inject
CamelContext context;

@Test
public void quarkusKafkaClientFactoryRegistryBeanNotNull() {
Set<KafkaClientFactory> factories = context.getRegistry().findByType(KafkaClientFactory.class);
assertEquals(1, factories.size());

KafkaClientFactory factory = factories.iterator().next();
assertTrue(factory instanceof QuarkusKafkaClientFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.kafka.deployment;

import java.util.Arrays;
import java.util.Properties;

import javax.inject.Inject;

import io.quarkus.bootstrap.model.AppArtifact;
import io.quarkus.builder.Version;
import io.quarkus.test.QuarkusUnitTest;
import org.apache.camel.component.kafka.KafkaClientFactory;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.quarkus.component.kafka.QuarkusKafkaClientFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class QuarkusKafkaClientFactoryTest {

@RegisterExtension
static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
.setForcedDependencies(Arrays.asList(
new AppArtifact("io.quarkus", "quarkus-kubernetes-service-binding", Version.getVersion())))
.withConfigurationResource("application-configuration-merging.properties")
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));

@Inject
KafkaClientFactory factory;

@Test
public void testMergeConfiguration() {
assertNotNull(factory);

QuarkusKafkaClientFactory quarkusKafkaClientFactory = (QuarkusKafkaClientFactory) factory;

KafkaConfiguration configuration = new KafkaConfiguration();
configuration.setBrokers("camelhost:9999");
assertEquals("localhost:9092", quarkusKafkaClientFactory.getBrokers(configuration));

Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "camel-quarkus-group");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
quarkusKafkaClientFactory.mergeConfiguration(properties);

assertEquals("camel-quarkus-group", properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
assertEquals("1000",
properties.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
quarkus.kafka.devservices.enabled=false
quarkus.camel.kafka.kubernetes-service-binding.merge-configuration=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
quarkus.kafka.devservices.enabled=false
kafka.bootstrap-servers=localhost:9092
kafka.client.id=camel-quarkus-client
kafka.request.timeout.ms=1000
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
quarkus.kafka.devservices.enabled=false
Loading

0 comments on commit d6a9091

Please sign in to comment.