From d5d75654fcdccd59681dba087d9cb37a748df1be Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 15 Feb 2024 18:01:17 -0500 Subject: [PATCH] Fail if an XA TXN manager does not enlist the XA session in the TXN If the TXN manager does not enlist the XASession into the TXN then we should throw an error to indicate that the session could not enter the TXN and we need to ensure that on an error in enlistment or in registering the synchronization we return the session to the pool and properly reset the connection references so that session don't get trapped in the loaned state. This closes #32 (cherry picked from commit c79b2b37d270b21309c5b9da95824e1aab8dfd7c) (cherry picked from commit 77cda3aff746288c6cb91982fe1c6e4bd4ca24a6) --- .../pooled/jms/XAConnectionPoolTest.java | 4 +- .../pooled/jms/JmsPoolSession.java | 113 +++++++++------ .../pooled/jms/pool/PooledXAConnection.java | 57 ++++++-- .../pooled/jms/JmsPoolXAConnectionTest.java | 131 ++++++++++++++++++ .../pooled/jms/mock/MockJMSConnection.java | 2 +- .../jms/mock/MockJMSConnectionFactory.java | 6 +- .../pooled/jms/mock/MockJMSXAConnection.java | 62 +++++++++ .../jms/mock/MockJMSXAConnectionFactory.java | 53 +++++++ .../pooled/jms/mock/MockJMSXASession.java | 42 ++++++ 9 files changed, 406 insertions(+), 64 deletions(-) create mode 100644 pooled-jms/src/test/java/org/messaginghub/pooled/jms/JmsPoolXAConnectionTest.java create mode 100644 pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXAConnection.java create mode 100644 pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXAConnectionFactory.java create mode 100644 pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXASession.java diff --git a/pooled-jms-interop-tests/pooled-jms-activemq-tests/src/test/java/org/messaginghub/pooled/jms/XAConnectionPoolTest.java b/pooled-jms-interop-tests/pooled-jms-activemq-tests/src/test/java/org/messaginghub/pooled/jms/XAConnectionPoolTest.java index b4e55bd..4f43527 100644 --- a/pooled-jms-interop-tests/pooled-jms-activemq-tests/src/test/java/org/messaginghub/pooled/jms/XAConnectionPoolTest.java +++ b/pooled-jms-interop-tests/pooled-jms-activemq-tests/src/test/java/org/messaginghub/pooled/jms/XAConnectionPoolTest.java @@ -107,7 +107,7 @@ public boolean enlistResource(XAResource xaRes) throws IllegalStateException, Ro } catch (XAException e) { e.printStackTrace(); } - return false; + return true; } @Override @@ -195,7 +195,7 @@ public void commit() throws HeuristicMixedException, HeuristicRollbackException, @Override public int getStatus() throws SystemException { - return Status.STATUS_ACTIVE; + return Status.STATUS_NO_TRANSACTION; } @Override diff --git a/pooled-jms/src/main/java/org/messaginghub/pooled/jms/JmsPoolSession.java b/pooled-jms/src/main/java/org/messaginghub/pooled/jms/JmsPoolSession.java index cac8bff..fb7a189 100644 --- a/pooled-jms/src/main/java/org/messaginghub/pooled/jms/JmsPoolSession.java +++ b/pooled-jms/src/main/java/org/messaginghub/pooled/jms/JmsPoolSession.java @@ -80,52 +80,8 @@ public JmsPoolSession(PooledSessionKey key, PooledSessionHolder sessionHolder, K @Override public void close() throws JMSException { - if (ignoreClose) { - return; - } - - if (closed.compareAndSet(false, true)) { - boolean invalidate = false; - try { - // lets reset the session - getInternalSession().setMessageListener(null); - - // Close any consumers, producers and browsers that may have been created. - for (MessageConsumer consumer : consumers) { - consumer.close(); - } - - for (QueueBrowser browser : browsers) { - browser.close(); - } - - for (MessageProducer producer : producers) { - producer.close(); - } - - consumers.clear(); - producers.clear(); - browsers.clear(); - - if (transactional && !isXa) { - try { - getInternalSession().rollback(); - } catch (JMSException e) { - invalidate = true; - LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e); - } - } - } catch (JMSException ex) { - invalidate = true; - LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex); - } finally { - consumers.clear(); - browsers.clear(); - for (JmsPoolSessionEventListener listener : this.sessionEventListeners) { - listener.onSessionClosed(this); - } - sessionEventListeners.clear(); - } + if (!ignoreClose && closed.compareAndSet(false, true)) { + boolean invalidate = cleanupSession(); if (invalidate) { // lets close the session and not put the session back into the pool @@ -157,6 +113,71 @@ public void close() throws JMSException { } } + private boolean cleanupSession() { + Exception cleanupError = null; + + try { + getInternalSession().setMessageListener(null); + } catch (JMSException e) { + cleanupError = cleanupError == null ? e : cleanupError; + } + + // Close any consumers, producers and browsers that may have been created. + for (MessageConsumer consumer : consumers) { + try { + consumer.close(); + } catch (JMSException e) { + LOG.trace("Caught exception trying close a consumer, will invalidate. " + e, e); + cleanupError = cleanupError == null ? e : cleanupError; + } + } + + for (QueueBrowser browser : browsers) { + try { + browser.close(); + } catch (JMSException e) { + LOG.trace("Caught exception trying close a browser, will invalidate. " + e, e); + cleanupError = cleanupError == null ? e : cleanupError; + } + } + + for (MessageProducer producer : producers) { + try { + producer.close(); + } catch (JMSException e) { + LOG.trace("Caught exception trying close a producer, will invalidate. " + e, e); + cleanupError = cleanupError == null ? e : cleanupError; + } + } + + if (transactional && !isXa) { + try { + getInternalSession().rollback(); + } catch (JMSException e) { + LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e); + cleanupError = cleanupError == null ? e : cleanupError; + } + } + + producers.clear(); + consumers.clear(); + browsers.clear(); + + for (JmsPoolSessionEventListener listener : this.sessionEventListeners) { + try { + listener.onSessionClosed(this); + } catch (Exception e) { + cleanupError = cleanupError == null ? e : cleanupError; + } + } + + if (cleanupError != null) { + LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + cleanupError, cleanupError); + } + + return cleanupError != null; + } + //----- Destination factory methods --------------------------------------// @Override diff --git a/pooled-jms/src/main/java/org/messaginghub/pooled/jms/pool/PooledXAConnection.java b/pooled-jms/src/main/java/org/messaginghub/pooled/jms/pool/PooledXAConnection.java index 2cf41c7..7c034d3 100644 --- a/pooled-jms/src/main/java/org/messaginghub/pooled/jms/pool/PooledXAConnection.java +++ b/pooled-jms/src/main/java/org/messaginghub/pooled/jms/pool/PooledXAConnection.java @@ -16,12 +16,15 @@ */ package org.messaginghub.pooled.jms.pool; +import java.util.concurrent.atomic.AtomicBoolean; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.XAConnection; import javax.transaction.RollbackException; import javax.transaction.Status; +import javax.transaction.Synchronization; import javax.transaction.SystemException; import javax.transaction.TransactionManager; import javax.transaction.xa.XAResource; @@ -51,6 +54,7 @@ protected Session makeSession(PooledSessionKey key) throws JMSException { public Session createSession(boolean transacted, int ackMode) throws JMSException { try { boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION); + if (isXa) { // if the xa tx aborts inflight we don't want to auto create a // local transaction or auto ack @@ -63,16 +67,29 @@ public Session createSession(boolean transacted, int ackMode) throws JMSExceptio ackMode = Session.AUTO_ACKNOWLEDGE; } } - JmsPoolSession session = (JmsPoolSession) super.createSession(transacted, ackMode); + + final JmsPoolSession session = (JmsPoolSession) super.createSession(transacted, ackMode); + + session.setIgnoreClose(isXa); + session.setIsXa(isXa); + if (isXa) { - session.setIgnoreClose(true); - session.setIsXa(true); - transactionManager.getTransaction().registerSynchronization(new Synchronization(session)); incrementReferenceCount(); - transactionManager.getTransaction().enlistResource(createXaResource(session)); - } else { - session.setIgnoreClose(false); + + final JmsPooledXASessionSynchronization sync = new JmsPooledXASessionSynchronization(session); + + try { + transactionManager.getTransaction().registerSynchronization(sync); + + if (!transactionManager.getTransaction().enlistResource(createXaResource(session))) { + throw new JMSException("Enlistment of Pooled Session into transaction failed"); + } + } catch (Exception ex) { + sync.close(); + throw ex; + } } + return session; } catch (RollbackException e) { final JMSException jmsException = new JMSException("Rollback Exception"); @@ -89,14 +106,29 @@ protected XAResource createXaResource(JmsPoolSession session) throws JMSExceptio return session.getXAResource(); } - protected class Synchronization implements javax.transaction.Synchronization { + protected class JmsPooledXASessionSynchronization implements Synchronization { + + private final AtomicBoolean closed = new AtomicBoolean(); - private final JmsPoolSession session; + private JmsPoolSession session; - private Synchronization(JmsPoolSession session) { + private JmsPooledXASessionSynchronization(JmsPoolSession session) { this.session = session; } + public void close() throws JMSException { + if (closed.compareAndSet(false, true)) { + // This will return session to the pool. + session.setIgnoreClose(false); + try { + session.close(); + } finally { + session = null; + decrementReferenceCount(); + } + } + } + @Override public void beforeCompletion() { } @@ -104,10 +136,7 @@ public void beforeCompletion() { @Override public void afterCompletion(int status) { try { - // This will return session to the pool. - session.setIgnoreClose(false); - session.close(); - decrementReferenceCount(); + close(); } catch (JMSException e) { throw new RuntimeException(e); } diff --git a/pooled-jms/src/test/java/org/messaginghub/pooled/jms/JmsPoolXAConnectionTest.java b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/JmsPoolXAConnectionTest.java new file mode 100644 index 0000000..6c1532a --- /dev/null +++ b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/JmsPoolXAConnectionTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.messaginghub.pooled.jms; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; + +import javax.jms.Connection; +import javax.jms.ConnectionMetaData; +import javax.jms.JMSException; +import javax.jms.XASession; +import javax.transaction.RollbackException; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; + +import org.junit.Test; +import org.messaginghub.pooled.jms.mock.MockJMSConnectionMetaData; +import org.messaginghub.pooled.jms.mock.MockJMSXAConnectionFactory; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for the XA pooled connection handling. + */ +public class JmsPoolXAConnectionTest extends JmsPoolTestSupport { + + protected JmsPoolXAConnectionFactory xaCF; + + @Mock + TransactionManager txManager; + + @Mock + Transaction txn; + + @Override + public void setUp() throws Exception { + super.setUp(); + + MockitoAnnotations.openMocks(this); + + when(txManager.getTransaction()).thenReturn(txn); + when(txn.enlistResource(any())).thenReturn(true); + + factory = new MockJMSXAConnectionFactory(); + + xaCF = new JmsPoolXAConnectionFactory(); + xaCF.setTransactionManager(txManager); + xaCF.setConnectionFactory(factory); + xaCF.setMaxConnections(1); + } + + @Override + public void tearDown() throws Exception { + try { + xaCF.stop(); + } catch (Exception ex) { + // ignored + } + + super.tearDown(); + } + + @Test + public void testGetConnectionMetaData() throws Exception { + Connection connection = xaCF.createConnection(); + ConnectionMetaData metaData = connection.getMetaData(); + + assertNotNull(metaData); + assertSame(metaData, MockJMSConnectionMetaData.INSTANCE); + } + + @Test + public void testCreateXASession() throws Exception { + JmsPoolConnection connection = (JmsPoolConnection) xaCF.createConnection(); + XASession session = (XASession) connection.createSession(); + + when(txn.enlistResource(any())).thenReturn(true); + + assertNotNull(session); + + assertEquals(0, connection.getNumtIdleSessions()); + session.close(); + + // Session should be ignoring close at this stage + assertEquals(0, connection.getNumtIdleSessions()); + } + + @Test + public void testCreateXASessionFailsOnAddSynchronization() throws Exception { + JmsPoolConnection connection = (JmsPoolConnection) xaCF.createConnection(); + + doThrow(RollbackException.class).when(txn).registerSynchronization(any()); + when(txn.enlistResource(any())).thenReturn(true); + + assertThrows(JMSException.class, () -> connection.createSession()); + + // Session not should be ignoring close at this stage + assertEquals(1, connection.getNumtIdleSessions()); + } + + @Test + public void testCreateXASessionFailsOnEnlist() throws Exception { + JmsPoolConnection connection = (JmsPoolConnection) xaCF.createConnection(); + + when(txn.enlistResource(any())).thenReturn(false); + + assertThrows(JMSException.class, () -> connection.createSession()); + + // Session not should be ignoring close at this stage + assertEquals(1, connection.getNumtIdleSessions()); + } +} diff --git a/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSConnection.java b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSConnection.java index d92f0b6..5b7798b 100644 --- a/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSConnection.java +++ b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSConnection.java @@ -450,7 +450,7 @@ MockJMSConnection initialize() throws JMSException { return this; } - private void ensureConnected() throws JMSException { + protected void ensureConnected() throws JMSException { if (isConnected() || closed.get()) { return; } diff --git a/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSConnectionFactory.java b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSConnectionFactory.java index 731705d..23740c8 100644 --- a/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSConnectionFactory.java +++ b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSConnectionFactory.java @@ -82,7 +82,7 @@ private MockJMSConnection createMockConnection(String username, String password) throw new JMSSecurityException(user.getFailureCause()); } - MockJMSConnection connection = new MockJMSConnection(user); + MockJMSConnection connection = createMockConnectionInstance(user); if (clientID != null && !clientID.isEmpty()) { connection.setClientID(clientID, true); @@ -101,6 +101,10 @@ private MockJMSConnection createMockConnection(String username, String password) return connection; } + protected MockJMSConnection createMockConnectionInstance(MockJMSUser user ) { + return new MockJMSConnection(user); + } + //----- JMS Context Creation Methods -------------------------------------// @Override diff --git a/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXAConnection.java b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXAConnection.java new file mode 100644 index 0000000..2493559 --- /dev/null +++ b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXAConnection.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.messaginghub.pooled.jms.mock; + +import javax.jms.JMSException; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TopicSession; +import javax.jms.XAConnection; +import javax.jms.XASession; + +/** + * Simple Mock JMS XAConnection implementation for unit tests + */ +public class MockJMSXAConnection extends MockJMSConnection implements XAConnection { + + public MockJMSXAConnection(MockJMSUser user) { + super(user); + } + + @Override + public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { + throw new UnsupportedOperationException("Not yet implemented"); + } + + @Override + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + checkClosedOrFailed(); + ensureConnected(); + int ackMode = getSessionAcknowledgeMode(true, Session.SESSION_TRANSACTED); + MockJMSXASession result = new MockJMSXASession(getNextSessionId(), ackMode, this); + addSession(result); + if (isStarted()) { + result.start(); + } + return result; + } + + @Override + public XASession createXASession() throws JMSException { + return (XASession) createSession(true, Session.SESSION_TRANSACTED); + } +} diff --git a/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXAConnectionFactory.java b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXAConnectionFactory.java new file mode 100644 index 0000000..bf75407 --- /dev/null +++ b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXAConnectionFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.messaginghub.pooled.jms.mock; + +import javax.jms.JMSException; +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; +import javax.jms.XAJMSContext; + +/** + * A simple mock JMS XA ConnectionFactory implementation for tests + */ +public class MockJMSXAConnectionFactory extends MockJMSConnectionFactory implements XAConnectionFactory { + + @Override + public XAConnection createXAConnection() throws JMSException { + return (XAConnection) super.createConnection(); + } + + @Override + public XAConnection createXAConnection(String userName, String password) throws JMSException { + return (XAConnection) super.createConnection(userName, password); + } + + @Override + public XAJMSContext createXAContext() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public XAJMSContext createXAContext(String userName, String password) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + protected MockJMSConnection createMockConnectionInstance(MockJMSUser user ) { + return new MockJMSXAConnection(user); + } +} diff --git a/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXASession.java b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXASession.java new file mode 100644 index 0000000..8e20fc2 --- /dev/null +++ b/pooled-jms/src/test/java/org/messaginghub/pooled/jms/mock/MockJMSXASession.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.messaginghub.pooled.jms.mock; + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.XASession; +import javax.transaction.xa.XAResource; + +/** + * Mock JMS XA Session for use in unit tests + */ +public class MockJMSXASession extends MockJMSSession implements XASession { + + public MockJMSXASession(String sessionId, int sessionMode, MockJMSConnection connection) { + super(sessionId, sessionMode, connection); + } + + @Override + public Session getSession() throws JMSException { + return this; + } + + @Override + public XAResource getXAResource() { + return null; + } +}