Skip to content

Commit

Permalink
Fix #22 to add pooling support
Browse files Browse the repository at this point in the history
(cherry picked from commit 5410b28) with fixups
  • Loading branch information
zhfeng authored and gemmellr committed Apr 26, 2023
1 parent cae606c commit 900b910
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 1 deletion.
4 changes: 4 additions & 0 deletions deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jms-spi-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-netty-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
*/
package org.amqphub.quarkus.qpid.jms.deployment;

import javax.enterprise.context.ApplicationScoped;
import javax.jms.ConnectionFactory;

import org.amqphub.quarkus.qpid.jms.runtime.ConnectionFactoryWrapper;
import org.amqphub.quarkus.qpid.jms.runtime.QpidJmsProducer;
import org.amqphub.quarkus.qpid.jms.runtime.QpidJmsRecorder;

import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
Expand Down Expand Up @@ -50,15 +56,22 @@
import org.apache.qpid.proton.engine.impl.TransportImpl;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
import io.quarkus.jms.spi.deployment.ConnectionFactoryWrapperBuildItem;

import java.util.Optional;
import java.util.function.Function;

public class QpidJmsProcessor {
private static final String QPID_JMS = "qpid-jms";
Expand All @@ -81,6 +94,23 @@ AdditionalBeanBuildItem registerBean() {
return AdditionalBeanBuildItem.unremovableOf(QpidJmsProducer.class);
}

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
void connectionFactoryWrapper(Optional<ConnectionFactoryWrapperBuildItem> connectionFactoryWrapper,
QpidJmsRecorder recorder,
BuildProducer<SyntheticBeanBuildItem> syntheticBeanProducer) {
if (connectionFactoryWrapper.isPresent()) {
Function<ConnectionFactory, Object> wrapper = connectionFactoryWrapper.get().getWrapper();
SyntheticBeanBuildItem.ExtendedBeanConfigurator configurator = SyntheticBeanBuildItem.configure(ConnectionFactoryWrapper.class)
.setRuntimeInit()
.defaultBean()
.unremovable()
.scope(ApplicationScoped.class)
.runtimeValue(recorder.getConnectionFactoryWrapper(wrapper));
syntheticBeanProducer.produce(configurator.done());
}
}

@BuildStep
public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<RuntimeInitializedClassBuildItem> delayedInitialisation,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.amqphub.quarkus.qpid.jms.runtime;

import javax.jms.ConnectionFactory;

public interface ConnectionFactoryWrapper {
ConnectionFactory wrap(ConnectionFactory connectionFactory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.amqphub.quarkus.qpid.jms.runtime;

import io.quarkus.arc.Arc;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
Expand All @@ -34,7 +35,14 @@ public class QpidJmsProducer {
@ApplicationScoped
@DefaultBean
public ConnectionFactory connectionFactory() {
return new JmsConnectionFactory(config.username.orElse(null), config.password.orElse(null), config.url);
ConnectionFactory connectionFactory = new JmsConnectionFactory(config.username.orElse(null), config.password.orElse(null), config.url);
ConnectionFactoryWrapper wrapper = Arc.container().instance(ConnectionFactoryWrapper.class).get();

if (config.wrap && wrapper != null) {
return wrapper.wrap(connectionFactory);
} else {
return connectionFactory;
}
}

public QpidJmsRuntimeConfig getConfig() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.amqphub.quarkus.qpid.jms.runtime;

import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import javax.jms.ConnectionFactory;

import java.util.function.Function;

@Recorder
public class QpidJmsRecorder {
public RuntimeValue<ConnectionFactoryWrapper> getConnectionFactoryWrapper(Function<ConnectionFactory, Object> wrapper) {
return new RuntimeValue<>(connectionFactory -> (ConnectionFactory) wrapper.apply(connectionFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@ public class QpidJmsRuntimeConfig {
*/
@ConfigItem
public Optional<String> password;

/**
* Whether to wrap a ConnectionFactory by ConnectionFactoryWrapper which could be introduced by other extensions,
* such as quarkus-pooled-jms to provide pooling capability
*/
@ConfigItem(defaultValue = "false")
public boolean wrap;
}

0 comments on commit 900b910

Please sign in to comment.