Skip to content

Commit

Permalink
Fail if an XA TXN manager does not enlist the XA session in the TXN
Browse files Browse the repository at this point in the history
If the TXN manager does not enlist the XASession into the TXN then we
should throw an error to indicate that the session could not enter the
TXN and we need to ensure that on an error in enlistment or in
registering the synchronization we return the session to the pool and
properly reset the connection references so that session don't get
trapped in the loaned state.

This closes #32

(cherry picked from commit c79b2b3)
(cherry picked from commit 77cda3a)
  • Loading branch information
tabish121 committed Feb 15, 2024
1 parent eec12bb commit d5d7565
Show file tree
Hide file tree
Showing 9 changed files with 406 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public boolean enlistResource(XAResource xaRes) throws IllegalStateException, Ro
} catch (XAException e) {
e.printStackTrace();
}
return false;
return true;
}

@Override
Expand Down Expand Up @@ -195,7 +195,7 @@ public void commit() throws HeuristicMixedException, HeuristicRollbackException,

@Override
public int getStatus() throws SystemException {
return Status.STATUS_ACTIVE;
return Status.STATUS_NO_TRANSACTION;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,52 +80,8 @@ public JmsPoolSession(PooledSessionKey key, PooledSessionHolder sessionHolder, K

@Override
public void close() throws JMSException {
if (ignoreClose) {
return;
}

if (closed.compareAndSet(false, true)) {
boolean invalidate = false;
try {
// lets reset the session
getInternalSession().setMessageListener(null);

// Close any consumers, producers and browsers that may have been created.
for (MessageConsumer consumer : consumers) {
consumer.close();
}

for (QueueBrowser browser : browsers) {
browser.close();
}

for (MessageProducer producer : producers) {
producer.close();
}

consumers.clear();
producers.clear();
browsers.clear();

if (transactional && !isXa) {
try {
getInternalSession().rollback();
} catch (JMSException e) {
invalidate = true;
LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
}
}
} catch (JMSException ex) {
invalidate = true;
LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
} finally {
consumers.clear();
browsers.clear();
for (JmsPoolSessionEventListener listener : this.sessionEventListeners) {
listener.onSessionClosed(this);
}
sessionEventListeners.clear();
}
if (!ignoreClose && closed.compareAndSet(false, true)) {
boolean invalidate = cleanupSession();

if (invalidate) {
// lets close the session and not put the session back into the pool
Expand Down Expand Up @@ -157,6 +113,71 @@ public void close() throws JMSException {
}
}

private boolean cleanupSession() {
Exception cleanupError = null;

try {
getInternalSession().setMessageListener(null);
} catch (JMSException e) {
cleanupError = cleanupError == null ? e : cleanupError;
}

// Close any consumers, producers and browsers that may have been created.
for (MessageConsumer consumer : consumers) {
try {
consumer.close();
} catch (JMSException e) {
LOG.trace("Caught exception trying close a consumer, will invalidate. " + e, e);
cleanupError = cleanupError == null ? e : cleanupError;
}
}

for (QueueBrowser browser : browsers) {
try {
browser.close();
} catch (JMSException e) {
LOG.trace("Caught exception trying close a browser, will invalidate. " + e, e);
cleanupError = cleanupError == null ? e : cleanupError;
}
}

for (MessageProducer producer : producers) {
try {
producer.close();
} catch (JMSException e) {
LOG.trace("Caught exception trying close a producer, will invalidate. " + e, e);
cleanupError = cleanupError == null ? e : cleanupError;
}
}

if (transactional && !isXa) {
try {
getInternalSession().rollback();
} catch (JMSException e) {
LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
cleanupError = cleanupError == null ? e : cleanupError;
}
}

producers.clear();
consumers.clear();
browsers.clear();

for (JmsPoolSessionEventListener listener : this.sessionEventListeners) {
try {
listener.onSessionClosed(this);
} catch (Exception e) {
cleanupError = cleanupError == null ? e : cleanupError;
}
}

if (cleanupError != null) {
LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + cleanupError, cleanupError);
}

return cleanupError != null;
}

//----- Destination factory methods --------------------------------------//

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
*/
package org.messaginghub.pooled.jms.pool;

import java.util.concurrent.atomic.AtomicBoolean;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
Expand Down Expand Up @@ -51,6 +54,7 @@ protected Session makeSession(PooledSessionKey key) throws JMSException {
public Session createSession(boolean transacted, int ackMode) throws JMSException {
try {
boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);

if (isXa) {
// if the xa tx aborts inflight we don't want to auto create a
// local transaction or auto ack
Expand All @@ -63,16 +67,29 @@ public Session createSession(boolean transacted, int ackMode) throws JMSExceptio
ackMode = Session.AUTO_ACKNOWLEDGE;
}
}
JmsPoolSession session = (JmsPoolSession) super.createSession(transacted, ackMode);

final JmsPoolSession session = (JmsPoolSession) super.createSession(transacted, ackMode);

session.setIgnoreClose(isXa);
session.setIsXa(isXa);

if (isXa) {
session.setIgnoreClose(true);
session.setIsXa(true);
transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
incrementReferenceCount();
transactionManager.getTransaction().enlistResource(createXaResource(session));
} else {
session.setIgnoreClose(false);

final JmsPooledXASessionSynchronization sync = new JmsPooledXASessionSynchronization(session);

try {
transactionManager.getTransaction().registerSynchronization(sync);

if (!transactionManager.getTransaction().enlistResource(createXaResource(session))) {
throw new JMSException("Enlistment of Pooled Session into transaction failed");
}
} catch (Exception ex) {
sync.close();
throw ex;
}
}

return session;
} catch (RollbackException e) {
final JMSException jmsException = new JMSException("Rollback Exception");
Expand All @@ -89,25 +106,37 @@ protected XAResource createXaResource(JmsPoolSession session) throws JMSExceptio
return session.getXAResource();
}

protected class Synchronization implements javax.transaction.Synchronization {
protected class JmsPooledXASessionSynchronization implements Synchronization {

private final AtomicBoolean closed = new AtomicBoolean();

private final JmsPoolSession session;
private JmsPoolSession session;

private Synchronization(JmsPoolSession session) {
private JmsPooledXASessionSynchronization(JmsPoolSession session) {
this.session = session;
}

public void close() throws JMSException {
if (closed.compareAndSet(false, true)) {
// This will return session to the pool.
session.setIgnoreClose(false);
try {
session.close();
} finally {
session = null;
decrementReferenceCount();
}
}
}

@Override
public void beforeCompletion() {
}

@Override
public void afterCompletion(int status) {
try {
// This will return session to the pool.
session.setIgnoreClose(false);
session.close();
decrementReferenceCount();
close();
} catch (JMSException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.messaginghub.pooled.jms;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;

import javax.jms.Connection;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
import javax.jms.XASession;
import javax.transaction.RollbackException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;

import org.junit.Test;
import org.messaginghub.pooled.jms.mock.MockJMSConnectionMetaData;
import org.messaginghub.pooled.jms.mock.MockJMSXAConnectionFactory;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

/**
* Tests for the XA pooled connection handling.
*/
public class JmsPoolXAConnectionTest extends JmsPoolTestSupport {

protected JmsPoolXAConnectionFactory xaCF;

@Mock
TransactionManager txManager;

@Mock
Transaction txn;

@Override
public void setUp() throws Exception {
super.setUp();

MockitoAnnotations.openMocks(this);

when(txManager.getTransaction()).thenReturn(txn);
when(txn.enlistResource(any())).thenReturn(true);

factory = new MockJMSXAConnectionFactory();

xaCF = new JmsPoolXAConnectionFactory();
xaCF.setTransactionManager(txManager);
xaCF.setConnectionFactory(factory);
xaCF.setMaxConnections(1);
}

@Override
public void tearDown() throws Exception {
try {
xaCF.stop();
} catch (Exception ex) {
// ignored
}

super.tearDown();
}

@Test
public void testGetConnectionMetaData() throws Exception {
Connection connection = xaCF.createConnection();
ConnectionMetaData metaData = connection.getMetaData();

assertNotNull(metaData);
assertSame(metaData, MockJMSConnectionMetaData.INSTANCE);
}

@Test
public void testCreateXASession() throws Exception {
JmsPoolConnection connection = (JmsPoolConnection) xaCF.createConnection();
XASession session = (XASession) connection.createSession();

when(txn.enlistResource(any())).thenReturn(true);

assertNotNull(session);

assertEquals(0, connection.getNumtIdleSessions());
session.close();

// Session should be ignoring close at this stage
assertEquals(0, connection.getNumtIdleSessions());
}

@Test
public void testCreateXASessionFailsOnAddSynchronization() throws Exception {
JmsPoolConnection connection = (JmsPoolConnection) xaCF.createConnection();

doThrow(RollbackException.class).when(txn).registerSynchronization(any());
when(txn.enlistResource(any())).thenReturn(true);

assertThrows(JMSException.class, () -> connection.createSession());

// Session not should be ignoring close at this stage
assertEquals(1, connection.getNumtIdleSessions());
}

@Test
public void testCreateXASessionFailsOnEnlist() throws Exception {
JmsPoolConnection connection = (JmsPoolConnection) xaCF.createConnection();

when(txn.enlistResource(any())).thenReturn(false);

assertThrows(JMSException.class, () -> connection.createSession());

// Session not should be ignoring close at this stage
assertEquals(1, connection.getNumtIdleSessions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ MockJMSConnection initialize() throws JMSException {
return this;
}

private void ensureConnected() throws JMSException {
protected void ensureConnected() throws JMSException {
if (isConnected() || closed.get()) {
return;
}
Expand Down
Loading

0 comments on commit d5d7565

Please sign in to comment.