io.quarkiverse.messaginghub
diff --git a/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQConnectionFactory.java b/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQConnectionFactory.java
index a550748b96c0..50317be3320c 100644
--- a/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQConnectionFactory.java
+++ b/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQConnectionFactory.java
@@ -17,8 +17,11 @@
package org.apache.camel.quarkus.component.jms.ibmmq.it;
import com.ibm.mq.jakarta.jms.MQConnectionFactory;
+import com.ibm.mq.jakarta.jms.MQXAConnectionFactory;
import com.ibm.msg.client.jakarta.wmq.WMQConstants;
import io.quarkiverse.messaginghub.pooled.jms.PooledJmsWrapper;
+import io.quarkus.arc.properties.IfBuildProperty;
+import io.quarkus.arc.properties.UnlessBuildProperty;
import jakarta.enterprise.inject.Produces;
import jakarta.jms.ConnectionFactory;
import org.eclipse.microprofile.config.ConfigProvider;
@@ -26,8 +29,22 @@
public class IBMMQConnectionFactory {
@Produces
+ @UnlessBuildProperty(name = "quarkus.pooled-jms.transaction", stringValue = "xa")
public ConnectionFactory createConnectionFactory(PooledJmsWrapper wrapper) {
MQConnectionFactory mq = new MQConnectionFactory();
+ setupMQ(mq);
+ return wrapper.wrapConnectionFactory(mq);
+ }
+
+ @Produces
+ @IfBuildProperty(name = "quarkus.pooled-jms.transaction", stringValue = "xa")
+ public ConnectionFactory createXAConnectionFactory(PooledJmsWrapper wrapper) {
+ MQXAConnectionFactory mq = new MQXAConnectionFactory();
+ setupMQ(mq);
+ return wrapper.wrapConnectionFactory(mq);
+ }
+
+ private void setupMQ(MQConnectionFactory mq) {
try {
mq.setHostName(ConfigProvider.getConfig().getValue("ibm.mq.host", String.class));
mq.setPort(ConfigProvider.getConfig().getValue("ibm.mq.port", Integer.class));
@@ -41,6 +58,6 @@ public ConnectionFactory createConnectionFactory(PooledJmsWrapper wrapper) {
} catch (Exception e) {
throw new RuntimeException("Unable to create new IBM MQ connection factory", e);
}
- return wrapper.wrapConnectionFactory(mq);
+
}
}
diff --git a/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQResource.java b/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQResource.java
index 0fc99e049589..6f36113ee876 100644
--- a/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQResource.java
+++ b/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQResource.java
@@ -24,8 +24,11 @@
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
@ApplicationScoped
@Path("/messaging/jms/ibmmq")
@@ -36,6 +39,12 @@ public class IBMMQResource {
@Produce("jms:queue:testPojoProducer")
ProducerTemplate pojoProducer;
+ @Inject
+ CamelContext context;
+
+ @Inject
+ ProducerTemplate producerTemplate;
+
@GET
@Path("/connection/factory")
@Produces(MediaType.TEXT_PLAIN)
@@ -48,4 +57,45 @@ public String connectionFactoryImplementation() {
public void pojoProducer(String message) {
pojoProducer.sendBody(message);
}
+
+ @POST
+ @Path("/xa")
+ public String testXA(String message) throws Exception {
+ MockEndpoint mockEndpoint = context.getEndpoint("mock:xaResult", MockEndpoint.class);
+
+ mockEndpoint.reset();
+ if (isValid(message)) {
+ mockEndpoint.expectedMessageCount(1);
+ } else {
+ mockEndpoint.expectedMessageCount(0);
+ }
+
+ try {
+ producerTemplate.sendBody("direct:xa", message);
+ } catch (CamelExecutionException e) {
+ // ignore the exception and we will check the mock:xaResult
+ }
+ mockEndpoint.assertIsSatisfied(5000);
+
+ if (isValid(message)) {
+ return mockEndpoint.getExchanges().get(0).getIn().getBody(String.class);
+ } else {
+ return "rollback";
+ }
+ }
+
+ @Path("/routes/startXA")
+ @GET
+ public void startRouteXA() {
+ try {
+ context.getRouteController().startRoute("xa");
+ context.getRouteController().startRoute("xaConsumer");
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to start xa route", e);
+ }
+ }
+
+ private boolean isValid(String message) {
+ return !message.startsWith("fail");
+ }
}
diff --git a/integration-tests/jms-ibmmq-client/src/main/resources/application.properties b/integration-tests/jms-ibmmq-client/src/main/resources/application.properties
index b32e2e1086e8..a7b986182d4f 100644
--- a/integration-tests/jms-ibmmq-client/src/main/resources/application.properties
+++ b/integration-tests/jms-ibmmq-client/src/main/resources/application.properties
@@ -17,3 +17,4 @@
#
# Only enabled with IBMMQPoolingTest
quarkus.pooled-jms.pooling.enabled=false
+quarkus.pooled-jms.transaction=disabled
diff --git a/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQXATest.java b/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQXATest.java
new file mode 100644
index 000000000000..4a04dbb61745
--- /dev/null
+++ b/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQXATest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.quarkus.component.jms.ibmmq.it;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.TestProfile;
+import io.restassured.RestAssured;
+import org.apache.camel.quarkus.component.jms.ibmmq.support.IBMMQDestinations;
+import org.apache.camel.quarkus.component.jms.ibmmq.support.IBMMQTestResource;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+
+import static org.hamcrest.core.Is.is;
+
+@QuarkusTest
+@QuarkusTestResource(IBMMQTestResource.class)
+@EnabledIfSystemProperty(named = "ibm.mq.container.license", matches = "accept")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestProfile(JmsXAEnabled.class)
+public class IBMMQXATest {
+ private IBMMQDestinations destinations;
+
+ /**
+ * IBM MQ needs to have the destinations created before you can use them.
+ *
+ * This method is called after the routes start, so the routes will print a warning first that the destinations don't
+ * exist, only then they are
+ * created using this method
+ *
+ * @param test test
+ */
+ @BeforeAll
+ public void startRoutes(TestInfo test) {
+ destinations.createQueue("xa");
+
+ RestAssured.given()
+ // see AbstractMessagingTest#beforeAll
+ .port(ConfigProvider.getConfig().getValue("quarkus.http.test-port", Integer.class))
+ .get("/messaging/jms/ibmmq/routes/startXA");
+ }
+
+ @Test
+ public void connectionFactoryImplementation() {
+ RestAssured.get("/messaging/jms/ibmmq/connection/factory")
+ .then()
+ .statusCode(200)
+ .body(is("org.messaginghub.pooled.jms.JmsPoolXAConnectionFactory"));
+ }
+
+ @Test
+ public void testJmsXACommit() {
+ RestAssured.given()
+ .body("commit")
+ .post("/messaging/jms/ibmmq/xa")
+ .then()
+ .statusCode(200)
+ .body(is("commit"));
+ }
+
+ @Test
+ public void testJmsXARollback() {
+ RestAssured.given()
+ .body("fail")
+ .post("/messaging/jms/ibmmq/xa")
+ .then()
+ .statusCode(200)
+ .body(is("rollback"));
+ }
+}
diff --git a/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/JmsXAEnabled.java b/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/JmsXAEnabled.java
new file mode 100644
index 000000000000..dd6049e083dc
--- /dev/null
+++ b/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/JmsXAEnabled.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.quarkus.component.jms.ibmmq.it;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import io.quarkus.test.junit.QuarkusTestProfile;
+
+public class JmsXAEnabled implements QuarkusTestProfile {
+ @Override
+ public Map getConfigOverrides() {
+ Map props = new HashMap<>();
+ props.put("quarkus.pooled-jms.pooling.enabled", "true");
+ props.put("quarkus.pooled-jms.max-connections", "8");
+ props.put("quarkus.pooled-jms.transaction", "xa");
+ props.put("quarkus.transaction-manager.enable-recovery", "true");
+ return props;
+ }
+}
diff --git a/integration-tests/messaging/jms/src/main/java/org/apache/camel/quarkus/messaging/jms/JmsRoutes.java b/integration-tests/messaging/jms/src/main/java/org/apache/camel/quarkus/messaging/jms/JmsRoutes.java
index 8c46757cd25a..0a1647fe1cd3 100644
--- a/integration-tests/messaging/jms/src/main/java/org/apache/camel/quarkus/messaging/jms/JmsRoutes.java
+++ b/integration-tests/messaging/jms/src/main/java/org/apache/camel/quarkus/messaging/jms/JmsRoutes.java
@@ -19,8 +19,11 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.transaction.TransactionManager;
+import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.quarkus.component.messaging.it.util.scheme.ComponentScheme;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.jta.JtaTransactionManager;
@ApplicationScoped
public class JmsRoutes extends RouteBuilder {
@@ -44,11 +47,13 @@ public void configure() throws Exception {
.bean("destinationHeaderSetter")
.toF("%s:queue:override", componentScheme);
- fromF("%s:queue:xa", componentScheme)
+ fromF("%s:queue:xa?transactionManager=#jtaTransactionManager", componentScheme)
+ .routeId("xaConsumer")
.log("Received message ${body}")
.to("mock:xaResult");
from("direct:xa")
+ .routeId("xa")
.transacted()
.process(x -> {
transactionManager.getTransaction().enlistResource(new DummyXAResource());
@@ -64,4 +69,9 @@ public void configure() throws Exception {
.log("Message added: ${body}")
.endChoice();
}
+
+ @BindToRegistry("jtaTransactionManager")
+ public PlatformTransactionManager getTransactionManager() {
+ return new JtaTransactionManager(transactionManager);
+ }
}