Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arc - transactional observers should register standard synchronization instead of interposed #19873

Merged
merged 1 commit into from
Sep 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import org.jboss.logging.Logger;

import io.quarkus.arc.Unremovable;

/**
* A delegating transaction manager which receives an instance of Narayana transaction manager
* and delegates all calls to it.
* On top of it the implementation adds the CDI events processing for {@link TransactionScoped}.
*/
@Singleton
@Unremovable // used by Arc for transactional observers
public class CDIDelegatingTransactionManager implements TransactionManager, Serializable {

private static final Logger log = Logger.getLogger(CDIDelegatingTransactionManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;
import com.arjuna.ats.jta.UserTransaction;

import io.quarkus.arc.Unremovable;

@Dependent
public class NarayanaJtaProducers {

Expand All @@ -39,7 +37,6 @@ public XAResourceRecoveryRegistry xaResourceRecoveryRegistry() {

@Produces
@ApplicationScoped
@Unremovable // needed by Arc for transactional observers
public TransactionSynchronizationRegistry transactionSynchronizationRegistry() {
return new TransactionSynchronizationRegistryImple();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import javax.enterprise.util.TypeLiteral;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.TransactionSynchronizationRegistry;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import org.jboss.logging.Logger;

/**
Expand All @@ -56,6 +57,8 @@ class EventImpl<T> implements Event<T> {

private transient volatile Notifier<? super T> lastNotifier;

private static final Logger LOGGER = Logger.getLogger(EventImpl.class);

EventImpl(Type eventType, Set<Annotation> qualifiers) {
this.eventType = initEventType(eventType);
this.injectionPointTypeHierarchy = new HierarchyDiscovery(this.eventType);
Expand Down Expand Up @@ -246,35 +249,47 @@ void notify(T event, ObserverExceptionHandler exceptionHandler, boolean async) {

if (!async && hasTxObservers) {
// Note that tx observers are never async
InstanceHandle<TransactionSynchronizationRegistry> registryInstance = Arc.container()
.instance(TransactionSynchronizationRegistry.class);

if (registryInstance.isAvailable() &&
registryInstance.get().getTransactionStatus() == javax.transaction.Status.STATUS_ACTIVE) {
// we have one or more transactional OM, and TransactionSynchronizationRegistry is available
// we attempt to register a JTA synchronization
List<DeferredEventNotification<?>> deferredEvents = new ArrayList<>();
EventContext eventContext = new EventContextImpl<>(event, eventMetadata);

for (ObserverMethod<? super T> om : observerMethods) {
if (isTxObserver(om)) {
deferredEvents.add(new DeferredEventNotification<>(om, eventContext,
Status.valueOf(om.getTransactionPhase())));
InstanceHandle<TransactionManager> transactionManagerInstance = Arc.container()
.instance(TransactionManager.class);

try {
if (transactionManagerInstance.isAvailable() &&
transactionManagerInstance.get().getStatus() == javax.transaction.Status.STATUS_ACTIVE) {
// we have one or more transactional OM, and TransactionManager is available
// we attempt to register a JTA synchronization
List<DeferredEventNotification<?>> deferredEvents = new ArrayList<>();
EventContext eventContext = new EventContextImpl<>(event, eventMetadata);

for (ObserverMethod<? super T> om : observerMethods) {
if (isTxObserver(om)) {
deferredEvents.add(new DeferredEventNotification<>(om, eventContext,
Status.valueOf(om.getTransactionPhase())));
}
}
}

Synchronization sync = new ArcSynchronization(deferredEvents);
TransactionSynchronizationRegistry registry = registryInstance.get();
try {
registry.registerInterposedSynchronization(sync);
// registration succeeded, notify all non-tx observers synchronously
predicate = predicate.and(this::isNotTxObserver);
} catch (Exception e) {
if (e.getCause() instanceof RollbackException || e.getCause() instanceof IllegalStateException) {
// registration failed, AFTER_SUCCESS OMs are accordingly to CDI spec left out
predicate = predicate.and(this::isNotAfterSuccess);
Synchronization sync = new ArcSynchronization(deferredEvents);
TransactionManager txManager = transactionManagerInstance.get();
try {
// NOTE - We are using standard synchronization on purpose as that seems more
// fitting than interposed sync. Either way will have some use-cases that won't work.
// See for instance discussions on https://github.com/eclipse-ee4j/cdi/issues/467
txManager.getTransaction().registerSynchronization(sync);
// registration succeeded, notify all non-tx observers synchronously
predicate = predicate.and(this::isNotTxObserver);
} catch (Exception e) {
if (e.getCause() instanceof RollbackException
|| e.getCause() instanceof IllegalStateException
|| e.getCause() instanceof SystemException) {
// registration failed, AFTER_SUCCESS OMs are accordingly to CDI spec left out
predicate = predicate.and(this::isNotAfterSuccess);
}
}
}
} catch (SystemException e) {
// In theory, this can be thrown by TransactionManager#getStatus() at which point we cannot even
// determine if we should register some synchronization, therefore, we only log this
LOGGER.debugf("Failure when trying to invoke TransactionManager#getStatus(). Stacktrace: %s",
e.getCause() != null ? e.getCause() : e);
}
}

Expand Down