diff --git a/integration-tests/jms-ibmmq-client/pom.xml b/integration-tests/jms-ibmmq-client/pom.xml index 79e06842964d..d103703a02c0 100644 --- a/integration-tests/jms-ibmmq-client/pom.xml +++ b/integration-tests/jms-ibmmq-client/pom.xml @@ -41,6 +41,12 @@ camel-quarkus-jms + + + org.apache.camel.quarkus + camel-quarkus-jta + + io.quarkiverse.messaginghub diff --git a/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQConnectionFactory.java b/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQConnectionFactory.java index a550748b96c0..50317be3320c 100644 --- a/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQConnectionFactory.java +++ b/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQConnectionFactory.java @@ -17,8 +17,11 @@ package org.apache.camel.quarkus.component.jms.ibmmq.it; import com.ibm.mq.jakarta.jms.MQConnectionFactory; +import com.ibm.mq.jakarta.jms.MQXAConnectionFactory; import com.ibm.msg.client.jakarta.wmq.WMQConstants; import io.quarkiverse.messaginghub.pooled.jms.PooledJmsWrapper; +import io.quarkus.arc.properties.IfBuildProperty; +import io.quarkus.arc.properties.UnlessBuildProperty; import jakarta.enterprise.inject.Produces; import jakarta.jms.ConnectionFactory; import org.eclipse.microprofile.config.ConfigProvider; @@ -26,8 +29,22 @@ public class IBMMQConnectionFactory { @Produces + @UnlessBuildProperty(name = "quarkus.pooled-jms.transaction", stringValue = "xa") public ConnectionFactory createConnectionFactory(PooledJmsWrapper wrapper) { MQConnectionFactory mq = new MQConnectionFactory(); + setupMQ(mq); + return wrapper.wrapConnectionFactory(mq); + } + + @Produces + @IfBuildProperty(name = "quarkus.pooled-jms.transaction", stringValue = "xa") + public ConnectionFactory createXAConnectionFactory(PooledJmsWrapper wrapper) { + MQXAConnectionFactory mq = new MQXAConnectionFactory(); + setupMQ(mq); + return wrapper.wrapConnectionFactory(mq); + } + + private void setupMQ(MQConnectionFactory mq) { try { mq.setHostName(ConfigProvider.getConfig().getValue("ibm.mq.host", String.class)); mq.setPort(ConfigProvider.getConfig().getValue("ibm.mq.port", Integer.class)); @@ -41,6 +58,6 @@ public ConnectionFactory createConnectionFactory(PooledJmsWrapper wrapper) { } catch (Exception e) { throw new RuntimeException("Unable to create new IBM MQ connection factory", e); } - return wrapper.wrapConnectionFactory(mq); + } } diff --git a/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQResource.java b/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQResource.java index 0fc99e049589..6f36113ee876 100644 --- a/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQResource.java +++ b/integration-tests/jms-ibmmq-client/src/main/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQResource.java @@ -24,8 +24,11 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelExecutionException; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; +import org.apache.camel.component.mock.MockEndpoint; @ApplicationScoped @Path("/messaging/jms/ibmmq") @@ -36,6 +39,12 @@ public class IBMMQResource { @Produce("jms:queue:testPojoProducer") ProducerTemplate pojoProducer; + @Inject + CamelContext context; + + @Inject + ProducerTemplate producerTemplate; + @GET @Path("/connection/factory") @Produces(MediaType.TEXT_PLAIN) @@ -48,4 +57,45 @@ public String connectionFactoryImplementation() { public void pojoProducer(String message) { pojoProducer.sendBody(message); } + + @POST + @Path("/xa") + public String testXA(String message) throws Exception { + MockEndpoint mockEndpoint = context.getEndpoint("mock:xaResult", MockEndpoint.class); + + mockEndpoint.reset(); + if (isValid(message)) { + mockEndpoint.expectedMessageCount(1); + } else { + mockEndpoint.expectedMessageCount(0); + } + + try { + producerTemplate.sendBody("direct:xa", message); + } catch (CamelExecutionException e) { + // ignore the exception and we will check the mock:xaResult + } + mockEndpoint.assertIsSatisfied(5000); + + if (isValid(message)) { + return mockEndpoint.getExchanges().get(0).getIn().getBody(String.class); + } else { + return "rollback"; + } + } + + @Path("/routes/startXA") + @GET + public void startRouteXA() { + try { + context.getRouteController().startRoute("xa"); + context.getRouteController().startRoute("xaConsumer"); + } catch (Exception e) { + throw new RuntimeException("Unable to start xa route", e); + } + } + + private boolean isValid(String message) { + return !message.startsWith("fail"); + } } diff --git a/integration-tests/jms-ibmmq-client/src/main/resources/application.properties b/integration-tests/jms-ibmmq-client/src/main/resources/application.properties index b32e2e1086e8..a7b986182d4f 100644 --- a/integration-tests/jms-ibmmq-client/src/main/resources/application.properties +++ b/integration-tests/jms-ibmmq-client/src/main/resources/application.properties @@ -17,3 +17,4 @@ # # Only enabled with IBMMQPoolingTest quarkus.pooled-jms.pooling.enabled=false +quarkus.pooled-jms.transaction=disabled diff --git a/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQXATest.java b/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQXATest.java new file mode 100644 index 000000000000..4a04dbb61745 --- /dev/null +++ b/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/IBMMQXATest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.quarkus.component.jms.ibmmq.it; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; +import io.restassured.RestAssured; +import org.apache.camel.quarkus.component.jms.ibmmq.support.IBMMQDestinations; +import org.apache.camel.quarkus.component.jms.ibmmq.support.IBMMQTestResource; +import org.eclipse.microprofile.config.ConfigProvider; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + +import static org.hamcrest.core.Is.is; + +@QuarkusTest +@QuarkusTestResource(IBMMQTestResource.class) +@EnabledIfSystemProperty(named = "ibm.mq.container.license", matches = "accept") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@TestProfile(JmsXAEnabled.class) +public class IBMMQXATest { + private IBMMQDestinations destinations; + + /** + * IBM MQ needs to have the destinations created before you can use them. + *

+ * This method is called after the routes start, so the routes will print a warning first that the destinations don't + * exist, only then they are + * created using this method + * + * @param test test + */ + @BeforeAll + public void startRoutes(TestInfo test) { + destinations.createQueue("xa"); + + RestAssured.given() + // see AbstractMessagingTest#beforeAll + .port(ConfigProvider.getConfig().getValue("quarkus.http.test-port", Integer.class)) + .get("/messaging/jms/ibmmq/routes/startXA"); + } + + @Test + public void connectionFactoryImplementation() { + RestAssured.get("/messaging/jms/ibmmq/connection/factory") + .then() + .statusCode(200) + .body(is("org.messaginghub.pooled.jms.JmsPoolXAConnectionFactory")); + } + + @Test + public void testJmsXACommit() { + RestAssured.given() + .body("commit") + .post("/messaging/jms/ibmmq/xa") + .then() + .statusCode(200) + .body(is("commit")); + } + + @Test + public void testJmsXARollback() { + RestAssured.given() + .body("fail") + .post("/messaging/jms/ibmmq/xa") + .then() + .statusCode(200) + .body(is("rollback")); + } +} diff --git a/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/JmsXAEnabled.java b/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/JmsXAEnabled.java new file mode 100644 index 000000000000..dd6049e083dc --- /dev/null +++ b/integration-tests/jms-ibmmq-client/src/test/java/org/apache/camel/quarkus/component/jms/ibmmq/it/JmsXAEnabled.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.quarkus.component.jms.ibmmq.it; + +import java.util.HashMap; +import java.util.Map; + +import io.quarkus.test.junit.QuarkusTestProfile; + +public class JmsXAEnabled implements QuarkusTestProfile { + @Override + public Map getConfigOverrides() { + Map props = new HashMap<>(); + props.put("quarkus.pooled-jms.pooling.enabled", "true"); + props.put("quarkus.pooled-jms.max-connections", "8"); + props.put("quarkus.pooled-jms.transaction", "xa"); + props.put("quarkus.transaction-manager.enable-recovery", "true"); + return props; + } +} diff --git a/integration-tests/messaging/jms/src/main/java/org/apache/camel/quarkus/messaging/jms/JmsRoutes.java b/integration-tests/messaging/jms/src/main/java/org/apache/camel/quarkus/messaging/jms/JmsRoutes.java index 8c46757cd25a..0a1647fe1cd3 100644 --- a/integration-tests/messaging/jms/src/main/java/org/apache/camel/quarkus/messaging/jms/JmsRoutes.java +++ b/integration-tests/messaging/jms/src/main/java/org/apache/camel/quarkus/messaging/jms/JmsRoutes.java @@ -19,8 +19,11 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.transaction.TransactionManager; +import org.apache.camel.BindToRegistry; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.quarkus.component.messaging.it.util.scheme.ComponentScheme; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.jta.JtaTransactionManager; @ApplicationScoped public class JmsRoutes extends RouteBuilder { @@ -44,11 +47,13 @@ public void configure() throws Exception { .bean("destinationHeaderSetter") .toF("%s:queue:override", componentScheme); - fromF("%s:queue:xa", componentScheme) + fromF("%s:queue:xa?transactionManager=#jtaTransactionManager", componentScheme) + .routeId("xaConsumer") .log("Received message ${body}") .to("mock:xaResult"); from("direct:xa") + .routeId("xa") .transacted() .process(x -> { transactionManager.getTransaction().enlistResource(new DummyXAResource()); @@ -64,4 +69,9 @@ public void configure() throws Exception { .log("Message added: ${body}") .endChoice(); } + + @BindToRegistry("jtaTransactionManager") + public PlatformTransactionManager getTransactionManager() { + return new JtaTransactionManager(transactionManager); + } }