Skip to content

Commit

Permalink
Fail if an XA TXN manager does not enlist the XA session in the TXN
Browse files Browse the repository at this point in the history
If the TXN manager does not enlist the XASession into the TXN then we
should throw an error to indicate that the session could not enter the
TXN and we need to ensure that on an error in enlistment or in
registering the synchronization we return the session to the pool and
properly reset the connection references so that session don't get
trapped in the loaned state.

This closes #32
  • Loading branch information
tabish121 committed Nov 14, 2023
1 parent e394fd5 commit c79b2b3
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.transaction.xa.XAResource;

import org.apache.commons.pool2.KeyedObjectPool;
import org.messaginghub.pooled.jms.pool.PooledSessionHolder;
import org.messaginghub.pooled.jms.pool.PooledSessionKey;
import org.messaginghub.pooled.jms.util.JMSExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.jms.BytesMessage;
import jakarta.jms.Destination;
import jakarta.jms.IllegalStateException;
Expand All @@ -45,14 +54,6 @@
import jakarta.jms.TopicSession;
import jakarta.jms.TopicSubscriber;
import jakarta.jms.XASession;
import javax.transaction.xa.XAResource;

import org.apache.commons.pool2.KeyedObjectPool;
import org.messaginghub.pooled.jms.pool.PooledSessionHolder;
import org.messaginghub.pooled.jms.pool.PooledSessionKey;
import org.messaginghub.pooled.jms.util.JMSExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsPoolSession implements Session, TopicSession, QueueSession, XASession, AutoCloseable {

Expand Down Expand Up @@ -80,52 +81,8 @@ public JmsPoolSession(PooledSessionKey key, PooledSessionHolder sessionHolder, K

@Override
public void close() throws JMSException {
if (ignoreClose) {
return;
}

if (closed.compareAndSet(false, true)) {
boolean invalidate = false;
try {
// lets reset the session
getInternalSession().setMessageListener(null);

// Close any consumers, producers and browsers that may have been created.
for (MessageConsumer consumer : consumers) {
consumer.close();
}

for (QueueBrowser browser : browsers) {
browser.close();
}

for (MessageProducer producer : producers) {
producer.close();
}

consumers.clear();
producers.clear();
browsers.clear();

if (transactional && !isXa) {
try {
getInternalSession().rollback();
} catch (JMSException e) {
invalidate = true;
LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
}
}
} catch (JMSException ex) {
invalidate = true;
LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
} finally {
consumers.clear();
browsers.clear();
for (JmsPoolSessionEventListener listener : this.sessionEventListeners) {
listener.onSessionClosed(this);
}
sessionEventListeners.clear();
}
if (!ignoreClose && closed.compareAndSet(false, true)) {
boolean invalidate = cleanupSession();

if (invalidate) {
// lets close the session and not put the session back into the pool
Expand Down Expand Up @@ -157,6 +114,71 @@ public void close() throws JMSException {
}
}

private boolean cleanupSession() {
Exception cleanupError = null;

try {
getInternalSession().setMessageListener(null);
} catch (JMSException e) {
cleanupError = cleanupError == null ? e : cleanupError;
}

// Close any consumers, producers and browsers that may have been created.
for (MessageConsumer consumer : consumers) {
try {
consumer.close();
} catch (JMSException e) {
LOG.trace("Caught exception trying close a consumer, will invalidate. " + e, e);
cleanupError = cleanupError == null ? e : cleanupError;
}
}

for (QueueBrowser browser : browsers) {
try {
browser.close();
} catch (JMSException e) {
LOG.trace("Caught exception trying close a browser, will invalidate. " + e, e);
cleanupError = cleanupError == null ? e : cleanupError;
}
}

for (MessageProducer producer : producers) {
try {
producer.close();
} catch (JMSException e) {
LOG.trace("Caught exception trying close a producer, will invalidate. " + e, e);
cleanupError = cleanupError == null ? e : cleanupError;
}
}

if (transactional && !isXa) {
try {
getInternalSession().rollback();
} catch (JMSException e) {
LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
cleanupError = cleanupError == null ? e : cleanupError;
}
}

producers.clear();
consumers.clear();
browsers.clear();

for (JmsPoolSessionEventListener listener : this.sessionEventListeners) {
try {
listener.onSessionClosed(this);
} catch (Exception e) {
cleanupError = cleanupError == null ? e : cleanupError;
}
}

if (cleanupError != null) {
LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + cleanupError, cleanupError);
}

return cleanupError != null;
}

//----- Destination factory methods --------------------------------------//

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
*/
package org.messaginghub.pooled.jms.pool;

import java.util.concurrent.atomic.AtomicBoolean;

import javax.transaction.xa.XAResource;

import org.messaginghub.pooled.jms.JmsPoolSession;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Session;
Expand All @@ -24,9 +30,6 @@
import jakarta.transaction.Status;
import jakarta.transaction.SystemException;
import jakarta.transaction.TransactionManager;
import javax.transaction.xa.XAResource;

import org.messaginghub.pooled.jms.JmsPoolSession;

/**
* An XA-aware connection pool. When a session is created and an xa transaction
Expand All @@ -51,6 +54,7 @@ protected Session makeSession(PooledSessionKey key) throws JMSException {
public Session createSession(boolean transacted, int ackMode) throws JMSException {
try {
boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);

if (isXa) {
// if the xa tx aborts inflight we don't want to auto create a
// local transaction or auto ack
Expand All @@ -63,16 +67,29 @@ public Session createSession(boolean transacted, int ackMode) throws JMSExceptio
ackMode = Session.AUTO_ACKNOWLEDGE;
}
}
JmsPoolSession session = (JmsPoolSession) super.createSession(transacted, ackMode);

final JmsPoolSession session = (JmsPoolSession) super.createSession(transacted, ackMode);

session.setIgnoreClose(isXa);
session.setIsXa(isXa);

if (isXa) {
session.setIgnoreClose(true);
session.setIsXa(true);
transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
incrementReferenceCount();
transactionManager.getTransaction().enlistResource(createXaResource(session));
} else {
session.setIgnoreClose(false);

final JmsPooledXASessionSynchronization sync = new JmsPooledXASessionSynchronization(session);

try {
transactionManager.getTransaction().registerSynchronization(sync);

if (!transactionManager.getTransaction().enlistResource(createXaResource(session))) {
throw new JMSException("Enlistment of Pooled Session into transaction failed");
}
} catch (Exception ex) {
sync.close();
throw ex;
}
}

return session;
} catch (RollbackException e) {
final JMSException jmsException = new JMSException("Rollback Exception");
Expand All @@ -89,25 +106,37 @@ protected XAResource createXaResource(JmsPoolSession session) throws JMSExceptio
return session.getXAResource();
}

protected class Synchronization implements jakarta.transaction.Synchronization {
protected class JmsPooledXASessionSynchronization implements jakarta.transaction.Synchronization {

private final JmsPoolSession session;
private final AtomicBoolean closed = new AtomicBoolean();

private Synchronization(JmsPoolSession session) {
private JmsPoolSession session;

private JmsPooledXASessionSynchronization(JmsPoolSession session) {
this.session = session;
}

public void close() throws JMSException {
if (closed.compareAndSet(false, true)) {
// This will return session to the pool.
session.setIgnoreClose(false);
try {
session.close();
} finally {
session = null;
decrementReferenceCount();
}
}
}

@Override
public void beforeCompletion() {
}

@Override
public void afterCompletion(int status) {
try {
// This will return session to the pool.
session.setIgnoreClose(false);
session.close();
decrementReferenceCount();
close();
} catch (JMSException e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit c79b2b3

Please sign in to comment.