Skip to content

Commit

Permalink
Merge pull request #19873 from manovotn/issue18737
Browse files Browse the repository at this point in the history
Arc - transactional observers should register standard synchronization instead of interposed
  • Loading branch information
manovotn authored Sep 6, 2021
2 parents 8e48fb6 + ac40c15 commit e3f69d7
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import org.jboss.logging.Logger;

import io.quarkus.arc.Unremovable;

/**
* A delegating transaction manager which receives an instance of Narayana transaction manager
* and delegates all calls to it.
* On top of it the implementation adds the CDI events processing for {@link TransactionScoped}.
*/
@Singleton
@Unremovable // used by Arc for transactional observers
public class CDIDelegatingTransactionManager implements TransactionManager, Serializable {

private static final Logger log = Logger.getLogger(CDIDelegatingTransactionManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;
import com.arjuna.ats.jta.UserTransaction;

import io.quarkus.arc.Unremovable;

@Dependent
public class NarayanaJtaProducers {

Expand All @@ -39,7 +37,6 @@ public XAResourceRecoveryRegistry xaResourceRecoveryRegistry() {

@Produces
@ApplicationScoped
@Unremovable // needed by Arc for transactional observers
public TransactionSynchronizationRegistry transactionSynchronizationRegistry() {
return new TransactionSynchronizationRegistryImple();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import javax.enterprise.util.TypeLiteral;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.TransactionSynchronizationRegistry;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import org.jboss.logging.Logger;

/**
Expand All @@ -56,6 +57,8 @@ class EventImpl<T> implements Event<T> {

private transient volatile Notifier<? super T> lastNotifier;

private static final Logger LOGGER = Logger.getLogger(EventImpl.class);

EventImpl(Type eventType, Set<Annotation> qualifiers) {
this.eventType = initEventType(eventType);
this.injectionPointTypeHierarchy = new HierarchyDiscovery(this.eventType);
Expand Down Expand Up @@ -246,35 +249,47 @@ void notify(T event, ObserverExceptionHandler exceptionHandler, boolean async) {

if (!async && hasTxObservers) {
// Note that tx observers are never async
InstanceHandle<TransactionSynchronizationRegistry> registryInstance = Arc.container()
.instance(TransactionSynchronizationRegistry.class);

if (registryInstance.isAvailable() &&
registryInstance.get().getTransactionStatus() == javax.transaction.Status.STATUS_ACTIVE) {
// we have one or more transactional OM, and TransactionSynchronizationRegistry is available
// we attempt to register a JTA synchronization
List<DeferredEventNotification<?>> deferredEvents = new ArrayList<>();
EventContext eventContext = new EventContextImpl<>(event, eventMetadata);

for (ObserverMethod<? super T> om : observerMethods) {
if (isTxObserver(om)) {
deferredEvents.add(new DeferredEventNotification<>(om, eventContext,
Status.valueOf(om.getTransactionPhase())));
InstanceHandle<TransactionManager> transactionManagerInstance = Arc.container()
.instance(TransactionManager.class);

try {
if (transactionManagerInstance.isAvailable() &&
transactionManagerInstance.get().getStatus() == javax.transaction.Status.STATUS_ACTIVE) {
// we have one or more transactional OM, and TransactionManager is available
// we attempt to register a JTA synchronization
List<DeferredEventNotification<?>> deferredEvents = new ArrayList<>();
EventContext eventContext = new EventContextImpl<>(event, eventMetadata);

for (ObserverMethod<? super T> om : observerMethods) {
if (isTxObserver(om)) {
deferredEvents.add(new DeferredEventNotification<>(om, eventContext,
Status.valueOf(om.getTransactionPhase())));
}
}
}

Synchronization sync = new ArcSynchronization(deferredEvents);
TransactionSynchronizationRegistry registry = registryInstance.get();
try {
registry.registerInterposedSynchronization(sync);
// registration succeeded, notify all non-tx observers synchronously
predicate = predicate.and(this::isNotTxObserver);
} catch (Exception e) {
if (e.getCause() instanceof RollbackException || e.getCause() instanceof IllegalStateException) {
// registration failed, AFTER_SUCCESS OMs are accordingly to CDI spec left out
predicate = predicate.and(this::isNotAfterSuccess);
Synchronization sync = new ArcSynchronization(deferredEvents);
TransactionManager txManager = transactionManagerInstance.get();
try {
// NOTE - We are using standard synchronization on purpose as that seems more
// fitting than interposed sync. Either way will have some use-cases that won't work.
// See for instance discussions on https://github.com/eclipse-ee4j/cdi/issues/467
txManager.getTransaction().registerSynchronization(sync);
// registration succeeded, notify all non-tx observers synchronously
predicate = predicate.and(this::isNotTxObserver);
} catch (Exception e) {
if (e.getCause() instanceof RollbackException
|| e.getCause() instanceof IllegalStateException
|| e.getCause() instanceof SystemException) {
// registration failed, AFTER_SUCCESS OMs are accordingly to CDI spec left out
predicate = predicate.and(this::isNotAfterSuccess);
}
}
}
} catch (SystemException e) {
// In theory, this can be thrown by TransactionManager#getStatus() at which point we cannot even
// determine if we should register some synchronization, therefore, we only log this
LOGGER.debugf("Failure when trying to invoke TransactionManager#getStatus(). Stacktrace: %s",
e.getCause() != null ? e.getCause() : e);
}
}

Expand Down

0 comments on commit e3f69d7

Please sign in to comment.