From 900b910214a82a0b22a4b59426ebe162da191ac2 Mon Sep 17 00:00:00 2001 From: Zheng Feng Date: Wed, 19 Apr 2023 11:06:16 +0800 Subject: [PATCH] Fix #22 to add pooling support (cherry picked from commit 5410b285071a84dd1a67572402115bb151ed3b74) with fixups --- deployment/pom.xml | 4 +++ .../qpid/jms/deployment/QpidJmsProcessor.java | 30 +++++++++++++++++++ .../jms/runtime/ConnectionFactoryWrapper.java | 22 ++++++++++++++ .../qpid/jms/runtime/QpidJmsProducer.java | 10 ++++++- .../qpid/jms/runtime/QpidJmsRecorder.java | 29 ++++++++++++++++++ .../jms/runtime/QpidJmsRuntimeConfig.java | 7 +++++ 6 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/ConnectionFactoryWrapper.java create mode 100644 runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsRecorder.java diff --git a/deployment/pom.xml b/deployment/pom.xml index bc17ea5..f9c347e 100644 --- a/deployment/pom.xml +++ b/deployment/pom.xml @@ -52,6 +52,10 @@ io.quarkus quarkus-arc-deployment + + io.quarkus + quarkus-jms-spi-deployment + io.quarkus quarkus-netty-deployment diff --git a/deployment/src/main/java/org/amqphub/quarkus/qpid/jms/deployment/QpidJmsProcessor.java b/deployment/src/main/java/org/amqphub/quarkus/qpid/jms/deployment/QpidJmsProcessor.java index 3b4d768..3b47a37 100644 --- a/deployment/src/main/java/org/amqphub/quarkus/qpid/jms/deployment/QpidJmsProcessor.java +++ b/deployment/src/main/java/org/amqphub/quarkus/qpid/jms/deployment/QpidJmsProcessor.java @@ -15,7 +15,13 @@ */ package org.amqphub.quarkus.qpid.jms.deployment; +import javax.enterprise.context.ApplicationScoped; +import javax.jms.ConnectionFactory; + +import org.amqphub.quarkus.qpid.jms.runtime.ConnectionFactoryWrapper; import org.amqphub.quarkus.qpid.jms.runtime.QpidJmsProducer; +import org.amqphub.quarkus.qpid.jms.runtime.QpidJmsRecorder; + import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.provider.amqp.AmqpProvider; @@ -50,15 +56,22 @@ import org.apache.qpid.proton.engine.impl.TransportImpl; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.arc.deployment.SyntheticBeanBuildItem; import io.quarkus.bootstrap.classloading.QuarkusClassLoader; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem; +import io.quarkus.jms.spi.deployment.ConnectionFactoryWrapperBuildItem; + +import java.util.Optional; +import java.util.function.Function; public class QpidJmsProcessor { private static final String QPID_JMS = "qpid-jms"; @@ -81,6 +94,23 @@ AdditionalBeanBuildItem registerBean() { return AdditionalBeanBuildItem.unremovableOf(QpidJmsProducer.class); } + @BuildStep + @Record(ExecutionTime.RUNTIME_INIT) + void connectionFactoryWrapper(Optional connectionFactoryWrapper, + QpidJmsRecorder recorder, + BuildProducer syntheticBeanProducer) { + if (connectionFactoryWrapper.isPresent()) { + Function wrapper = connectionFactoryWrapper.get().getWrapper(); + SyntheticBeanBuildItem.ExtendedBeanConfigurator configurator = SyntheticBeanBuildItem.configure(ConnectionFactoryWrapper.class) + .setRuntimeInit() + .defaultBean() + .unremovable() + .scope(ApplicationScoped.class) + .runtimeValue(recorder.getConnectionFactoryWrapper(wrapper)); + syntheticBeanProducer.produce(configurator.done()); + } + } + @BuildStep public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer reflectiveClass, BuildProducer delayedInitialisation, diff --git a/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/ConnectionFactoryWrapper.java b/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/ConnectionFactoryWrapper.java new file mode 100644 index 0000000..6f39cd7 --- /dev/null +++ b/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/ConnectionFactoryWrapper.java @@ -0,0 +1,22 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.amqphub.quarkus.qpid.jms.runtime; + +import javax.jms.ConnectionFactory; + +public interface ConnectionFactoryWrapper { + ConnectionFactory wrap(ConnectionFactory connectionFactory); +} diff --git a/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsProducer.java b/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsProducer.java index 894ca69..7e84643 100644 --- a/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsProducer.java +++ b/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsProducer.java @@ -15,6 +15,7 @@ */ package org.amqphub.quarkus.qpid.jms.runtime; +import io.quarkus.arc.Arc; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.inject.Produces; import javax.inject.Inject; @@ -34,7 +35,14 @@ public class QpidJmsProducer { @ApplicationScoped @DefaultBean public ConnectionFactory connectionFactory() { - return new JmsConnectionFactory(config.username.orElse(null), config.password.orElse(null), config.url); + ConnectionFactory connectionFactory = new JmsConnectionFactory(config.username.orElse(null), config.password.orElse(null), config.url); + ConnectionFactoryWrapper wrapper = Arc.container().instance(ConnectionFactoryWrapper.class).get(); + + if (config.wrap && wrapper != null) { + return wrapper.wrap(connectionFactory); + } else { + return connectionFactory; + } } public QpidJmsRuntimeConfig getConfig() { diff --git a/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsRecorder.java b/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsRecorder.java new file mode 100644 index 0000000..26df340 --- /dev/null +++ b/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsRecorder.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.amqphub.quarkus.qpid.jms.runtime; + +import io.quarkus.runtime.RuntimeValue; +import io.quarkus.runtime.annotations.Recorder; +import javax.jms.ConnectionFactory; + +import java.util.function.Function; + +@Recorder +public class QpidJmsRecorder { + public RuntimeValue getConnectionFactoryWrapper(Function wrapper) { + return new RuntimeValue<>(connectionFactory -> (ConnectionFactory) wrapper.apply(connectionFactory)); + } +} diff --git a/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsRuntimeConfig.java b/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsRuntimeConfig.java index f7e61c0..0dc5cc7 100644 --- a/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsRuntimeConfig.java +++ b/runtime/src/main/java/org/amqphub/quarkus/qpid/jms/runtime/QpidJmsRuntimeConfig.java @@ -43,4 +43,11 @@ public class QpidJmsRuntimeConfig { */ @ConfigItem public Optional password; + + /** + * Whether to wrap a ConnectionFactory by ConnectionFactoryWrapper which could be introduced by other extensions, + * such as quarkus-pooled-jms to provide pooling capability + */ + @ConfigItem(defaultValue = "false") + public boolean wrap; }