Skip to content

Commit

Permalink
Fix amqphub#22 to add pooling support
Browse files Browse the repository at this point in the history
  • Loading branch information
zhfeng committed Apr 20, 2023
1 parent e097b7c commit 724174b
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 2 deletions.
4 changes: 4 additions & 0 deletions deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jms-spi-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-netty-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
*/
package org.amqphub.quarkus.qpid.jms.deployment;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.jms.ConnectionFactory;

import org.amqphub.quarkus.qpid.jms.runtime.ConnectionFactoryWrapper;
import org.amqphub.quarkus.qpid.jms.runtime.QpidJmsProducer;
import org.amqphub.quarkus.qpid.jms.runtime.QpidJmsRecorder;

import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
Expand Down Expand Up @@ -50,15 +56,22 @@
import org.apache.qpid.proton.engine.impl.TransportImpl;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
import io.quarkus.jms.spi.deployment.ConnectionFactoryWrapperBuildItem;

import java.util.Optional;
import java.util.function.Function;

public class QpidJmsProcessor {
private static final String QPID_JMS = "qpid-jms";
Expand All @@ -80,6 +93,25 @@ public void enableSecurityServices(BuildProducer<FeatureBuildItem> feature,
AdditionalBeanBuildItem registerBean() {
return AdditionalBeanBuildItem.unremovableOf(QpidJmsProducer.class);
}

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
void connectionFactoryWrapper(Optional<ConnectionFactoryWrapperBuildItem> connectionFactoryWrapper,
QpidJmsRecorder recorder,
BuildProducer<SyntheticBeanBuildItem> syntheticBeanProducer) {
Optional<Function<ConnectionFactory, Object>> wrapper = Optional.empty();
if (connectionFactoryWrapper.isPresent()) {
wrapper = Optional.of(connectionFactoryWrapper.get().getWrapper());
}

SyntheticBeanBuildItem.ExtendedBeanConfigurator configurator = SyntheticBeanBuildItem.configure(ConnectionFactoryWrapper.class)
.setRuntimeInit()
.defaultBean()
.scope(ApplicationScoped.class)
.runtimeValue(recorder.getConnectionFactoryWrapper(wrapper));

syntheticBeanProducer.produce(configurator.done());
}

@BuildStep
public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.amqphub.quarkus.qpid.jms.runtime;

import jakarta.jms.ConnectionFactory;

public interface ConnectionFactoryWrapper {
ConnectionFactory wrap(ConnectionFactory connectionFactory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class QpidJmsProducer {
@Produces
@ApplicationScoped
@DefaultBean
public ConnectionFactory connectionFactory() {
return new JmsConnectionFactory(config.username.orElse(null), config.password.orElse(null), config.url);
public ConnectionFactory connectionFactory(ConnectionFactoryWrapper wrapper) {
return wrapper.wrap(new JmsConnectionFactory(config.username.orElse(null), config.password.orElse(null), config.url));
}

public QpidJmsRuntimeConfig getConfig() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.amqphub.quarkus.qpid.jms.runtime;

import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import jakarta.jms.ConnectionFactory;
import org.amqphub.quarkus.qpid.jms.runtime.ConnectionFactoryWrapper;

import java.util.Optional;
import java.util.function.Function;

@Recorder
public class QpidJmsRecorder {
public RuntimeValue<ConnectionFactoryWrapper> getConnectionFactoryWrapper(Optional<Function<ConnectionFactory, Object>> wrapper) {
return new RuntimeValue<>(connectionFactory -> {
if (wrapper.isPresent()) {
return (ConnectionFactory) wrapper.get().apply(connectionFactory);
} else {
return connectionFactory;
}
});
}
}

0 comments on commit 724174b

Please sign in to comment.