From d1c85668f231d5ec1e3f39664d86f7cac12d191b Mon Sep 17 00:00:00 2001 From: Zheng Feng Date: Wed, 15 Jun 2022 08:58:13 +0800 Subject: [PATCH] Improve Narayana recovery manager service and integrate it with agroal --- .../agroal/deployment/AgroalProcessor.java | 3 +++ .../quarkus/agroal/runtime/DataSources.java | 15 +++++++++--- .../jta/deployment/NarayanaInitBuildItem.java | 9 ++++++++ .../jta/deployment/NarayanaJtaProcessor.java | 23 ++++++++++++++++++- .../jta/runtime/NarayanaJtaProducers.java | 8 +++++-- .../jta/runtime/NarayanaJtaRecorder.java | 19 ++++++++++++++- .../TransactionManagerConfiguration.java | 22 ++++++++++++++++++ 7 files changed, 92 insertions(+), 7 deletions(-) create mode 100644 extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaInitBuildItem.java diff --git a/extensions/agroal/deployment/src/main/java/io/quarkus/agroal/deployment/AgroalProcessor.java b/extensions/agroal/deployment/src/main/java/io/quarkus/agroal/deployment/AgroalProcessor.java index 769ad707fd29e..e90502dc4dfe2 100644 --- a/extensions/agroal/deployment/src/main/java/io/quarkus/agroal/deployment/AgroalProcessor.java +++ b/extensions/agroal/deployment/src/main/java/io/quarkus/agroal/deployment/AgroalProcessor.java @@ -40,6 +40,7 @@ import io.quarkus.deployment.Feature; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.Consume; import io.quarkus.deployment.annotations.ExecutionTime; import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; @@ -48,6 +49,7 @@ import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.pkg.builditem.CurateOutcomeBuildItem; +import io.quarkus.narayana.jta.deployment.NarayanaInitBuildItem; import io.quarkus.runtime.configuration.ConfigurationException; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; @@ -221,6 +223,7 @@ void generateDataSourceSupportBean(AgroalRecorder recorder, @Record(ExecutionTime.RUNTIME_INIT) @BuildStep + @Consume(NarayanaInitBuildItem.class) void generateDataSourceBeans(AgroalRecorder recorder, DataSourcesRuntimeConfig dataSourcesRuntimeConfig, List aggregatedBuildTimeConfigBuildItems, diff --git a/extensions/agroal/runtime/src/main/java/io/quarkus/agroal/runtime/DataSources.java b/extensions/agroal/runtime/src/main/java/io/quarkus/agroal/runtime/DataSources.java index 5008a18b6da99..9adbe0abf066c 100644 --- a/extensions/agroal/runtime/src/main/java/io/quarkus/agroal/runtime/DataSources.java +++ b/extensions/agroal/runtime/src/main/java/io/quarkus/agroal/runtime/DataSources.java @@ -22,6 +22,7 @@ import javax.transaction.TransactionSynchronizationRegistry; import org.jboss.logging.Logger; +import org.jboss.tm.XAResourceRecoveryRegistry; import io.agroal.api.AgroalDataSource; import io.agroal.api.AgroalPoolInterceptor; @@ -72,6 +73,7 @@ public class DataSources { private final DataSourcesJdbcBuildTimeConfig dataSourcesJdbcBuildTimeConfig; private final DataSourcesJdbcRuntimeConfig dataSourcesJdbcRuntimeConfig; private final TransactionManager transactionManager; + private final XAResourceRecoveryRegistry xaResourceRecoveryRegistry; private final TransactionSynchronizationRegistry transactionSynchronizationRegistry; private final DataSourceSupport dataSourceSupport; private final Instance agroalPoolInterceptors; @@ -82,6 +84,7 @@ public DataSources(DataSourcesBuildTimeConfig dataSourcesBuildTimeConfig, DataSourcesRuntimeConfig dataSourcesRuntimeConfig, DataSourcesJdbcBuildTimeConfig dataSourcesJdbcBuildTimeConfig, DataSourcesJdbcRuntimeConfig dataSourcesJdbcRuntimeConfig, TransactionManager transactionManager, + XAResourceRecoveryRegistry xaResourceRecoveryRegistry, TransactionSynchronizationRegistry transactionSynchronizationRegistry, DataSourceSupport dataSourceSupport, @Any Instance agroalPoolInterceptors) { this.dataSourcesBuildTimeConfig = dataSourcesBuildTimeConfig; @@ -89,6 +92,7 @@ public DataSources(DataSourcesBuildTimeConfig dataSourcesBuildTimeConfig, this.dataSourcesJdbcBuildTimeConfig = dataSourcesJdbcBuildTimeConfig; this.dataSourcesJdbcRuntimeConfig = dataSourcesJdbcRuntimeConfig; this.transactionManager = transactionManager; + this.xaResourceRecoveryRegistry = xaResourceRecoveryRegistry; this.transactionSynchronizationRegistry = transactionSynchronizationRegistry; this.dataSourceSupport = dataSourceSupport; this.agroalPoolInterceptors = agroalPoolInterceptors; @@ -268,7 +272,10 @@ private void applyNewConfiguration(AgroalDataSourceConfigurationSupplier dataSou if (dataSourceJdbcBuildTimeConfig.transactions != io.quarkus.agroal.runtime.TransactionIntegration.DISABLED) { TransactionIntegration txIntegration = new NarayanaTransactionIntegration(transactionManager, - transactionSynchronizationRegistry); + transactionSynchronizationRegistry, null, false, + dataSourceJdbcBuildTimeConfig.transactions == io.quarkus.agroal.runtime.TransactionIntegration.XA + ? xaResourceRecoveryRegistry + : null); poolConfiguration.transactionIntegration(txIntegration); } @@ -287,12 +294,14 @@ private void applyNewConfiguration(AgroalDataSourceConfigurationSupplier dataSou // Authentication if (dataSourceRuntimeConfig.username.isPresent()) { + NamePrincipal username = new NamePrincipal(dataSourceRuntimeConfig.username.get()); connectionFactoryConfiguration - .principal(new NamePrincipal(dataSourceRuntimeConfig.username.get())); + .principal(username).recoveryPrincipal(username); } if (dataSourceRuntimeConfig.password.isPresent()) { + SimplePassword password = new SimplePassword(dataSourceRuntimeConfig.password.get()); connectionFactoryConfiguration - .credential(new SimplePassword(dataSourceRuntimeConfig.password.get())); + .credential(password).recoveryCredential(password); } // credentials provider diff --git a/extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaInitBuildItem.java b/extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaInitBuildItem.java new file mode 100644 index 0000000000000..5cbee9c7ce40d --- /dev/null +++ b/extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaInitBuildItem.java @@ -0,0 +1,9 @@ +package io.quarkus.narayana.jta.deployment; + +import io.quarkus.builder.item.EmptyBuildItem; + +/** + * Marker build item that indicates that the Narayana JTA extension has been initialized. + */ +public final class NarayanaInitBuildItem extends EmptyBuildItem { +} diff --git a/extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaJtaProcessor.java b/extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaJtaProcessor.java index 397f771bdfc79..18e88ec312178 100644 --- a/extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaJtaProcessor.java +++ b/extensions/narayana-jta/deployment/src/main/java/io/quarkus/narayana/jta/deployment/NarayanaJtaProcessor.java @@ -10,11 +10,20 @@ import javax.transaction.TransactionScoped; import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean; +import com.arjuna.ats.arjuna.recovery.TransactionStatusConnectionManager; import com.arjuna.ats.internal.arjuna.coordinator.CheckedActionFactoryImple; import com.arjuna.ats.internal.arjuna.objectstore.ShadowNoFileLockStore; +import com.arjuna.ats.internal.arjuna.recovery.AtomicActionExpiryScanner; +import com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule; +import com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner; import com.arjuna.ats.internal.arjuna.utils.SocketProcessId; import com.arjuna.ats.internal.jta.recovery.arjunacore.CommitMarkableResourceRecordRecoveryModule; +import com.arjuna.ats.internal.jta.recovery.arjunacore.JTAActionStatusServiceXAResourceOrphanFilter; +import com.arjuna.ats.internal.jta.recovery.arjunacore.JTANodeNameXAResourceOrphanFilter; +import com.arjuna.ats.internal.jta.recovery.arjunacore.JTATransactionLogXAResourceOrphanFilter; import com.arjuna.ats.internal.jta.recovery.arjunacore.RecoverConnectableAtomicAction; +import com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule; +import com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord; import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple; import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple; import com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple; @@ -32,6 +41,7 @@ import io.quarkus.deployment.IsTest; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.Produce; import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.ShutdownContextBuildItem; @@ -64,6 +74,7 @@ public NativeImageSystemPropertyBuildItem nativeImageSystemPropertyBuildItem() { @BuildStep @Record(RUNTIME_INIT) + @Produce(NarayanaInitBuildItem.class) public void build(NarayanaJtaRecorder recorder, BuildProducer additionalBeans, BuildProducer reflectiveClass, @@ -81,6 +92,9 @@ public void build(NarayanaJtaRecorder recorder, runtimeInit.produce(new RuntimeInitializedClassBuildItem(SocketProcessId.class.getName())); runtimeInit.produce(new RuntimeInitializedClassBuildItem(CommitMarkableResourceRecordRecoveryModule.class.getName())); runtimeInit.produce(new RuntimeInitializedClassBuildItem(RecoverConnectableAtomicAction.class.getName())); + runtimeInit.produce(new RuntimeInitializedClassBuildItem(TransactionStatusConnectionManager.class.getName())); + runtimeInit.produce(new RuntimeInitializedClassBuildItem(JTAActionStatusServiceXAResourceOrphanFilter.class.getName())); + runtimeInit.produce(new RuntimeInitializedClassBuildItem(AtomicActionExpiryScanner.class.getName())); reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, JTAEnvironmentBean.class.getName(), UserTransactionImple.class.getName(), @@ -89,7 +103,14 @@ public void build(NarayanaJtaRecorder recorder, TransactionSynchronizationRegistryImple.class.getName(), ObjectStoreEnvironmentBean.class.getName(), ShadowNoFileLockStore.class.getName(), - SocketProcessId.class.getName())); + SocketProcessId.class.getName(), + AtomicActionRecoveryModule.class.getName(), + XARecoveryModule.class.getName(), + XAResourceRecord.class.getName(), + JTATransactionLogXAResourceOrphanFilter.class.getName(), + JTANodeNameXAResourceOrphanFilter.class.getName(), + JTAActionStatusServiceXAResourceOrphanFilter.class.getName(), + ExpiredTransactionStatusManagerScanner.class.getName())); AdditionalBeanBuildItem.Builder builder = AdditionalBeanBuildItem.builder(); builder.addBeanClass(TransactionalInterceptorSupports.class); diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaProducers.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaProducers.java index 85e4b99d2e17e..abfb7d0d36f97 100644 --- a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaProducers.java +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaProducers.java @@ -3,6 +3,7 @@ import javax.enterprise.context.ApplicationScoped; import javax.enterprise.context.Dependent; import javax.enterprise.inject.Produces; +import javax.inject.Singleton; import javax.transaction.TransactionSynchronizationRegistry; import org.jboss.tm.JBossXATerminator; @@ -32,9 +33,12 @@ public javax.transaction.UserTransaction userTransaction() { } @Produces - @ApplicationScoped + @Singleton public XAResourceRecoveryRegistry xaResourceRecoveryRegistry() { - return new RecoveryManagerService(); + RecoveryManagerService recoveryManagerService = new RecoveryManagerService(); + recoveryManagerService.create(); + recoveryManagerService.start(); + return recoveryManagerService; } @Produces diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaRecorder.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaRecorder.java index b66dd67487d37..82d2e61b95ec4 100644 --- a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaRecorder.java +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/NarayanaJtaRecorder.java @@ -7,10 +7,15 @@ import org.jboss.logging.Logger; import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException; +import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean; +import com.arjuna.ats.arjuna.common.RecoveryEnvironmentBean; import com.arjuna.ats.arjuna.common.arjPropertyManager; import com.arjuna.ats.arjuna.coordinator.TransactionReaper; import com.arjuna.ats.arjuna.coordinator.TxControl; +import com.arjuna.ats.arjuna.recovery.RecoveryManager; +import com.arjuna.ats.jta.common.JTAEnvironmentBean; import com.arjuna.ats.jta.common.jtaPropertyManager; +import com.arjuna.common.internal.util.propertyservice.BeanPopulator; import com.arjuna.common.util.propertyservice.PropertiesFactory; import io.quarkus.runtime.ShutdownContext; @@ -67,13 +72,25 @@ public void disableTransactionStatusManager() { } public void setConfig(final TransactionManagerConfiguration transactions) { - arjPropertyManager.getObjectStoreEnvironmentBean().setObjectStoreDir(transactions.objectStoreDirectory); + BeanPopulator.getDefaultInstance(ObjectStoreEnvironmentBean.class) + .setObjectStoreDir(transactions.objectStoreDirectory); + BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, "communicationStore") + .setObjectStoreDir(transactions.objectStoreDirectory); + BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, "stateStore") + .setObjectStoreDir(transactions.objectStoreDirectory); + BeanPopulator.getDefaultInstance(RecoveryEnvironmentBean.class) + .setRecoveryModuleClassNames(transactions.recoveryModules); + BeanPopulator.getDefaultInstance(RecoveryEnvironmentBean.class) + .setExpiryScannerClassNames(transactions.expiryScanners); + BeanPopulator.getDefaultInstance(JTAEnvironmentBean.class) + .setXaResourceOrphanFilterClassNames(transactions.xaResourceOrphanFilters); } public void handleShutdown(ShutdownContext context) { context.addLastShutdownTask(new Runnable() { @Override public void run() { + RecoveryManager.manager().terminate(true); TransactionReaper.terminate(false); } }); diff --git a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/TransactionManagerConfiguration.java b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/TransactionManagerConfiguration.java index ee2c70841b308..b4f8ab1ee8cc1 100644 --- a/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/TransactionManagerConfiguration.java +++ b/extensions/narayana-jta/runtime/src/main/java/io/quarkus/narayana/jta/runtime/TransactionManagerConfiguration.java @@ -1,6 +1,7 @@ package io.quarkus.narayana.jta.runtime; import java.time.Duration; +import java.util.List; import io.quarkus.runtime.annotations.ConfigItem; import io.quarkus.runtime.annotations.ConfigPhase; @@ -30,4 +31,25 @@ public final class TransactionManagerConfiguration { */ @ConfigItem(defaultValue = "ObjectStore") public String objectStoreDirectory; + + /** + * The list of recovery modules + */ + @ConfigItem(defaultValue = "com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule," + + "com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule") + public List recoveryModules; + + /** + * The list of expiry scanners + */ + @ConfigItem(defaultValue = "com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner") + public List expiryScanners; + + /** + * The list of orphan filters + */ + @ConfigItem(defaultValue = "com.arjuna.ats.internal.jta.recovery.arjunacore.JTATransactionLogXAResourceOrphanFilter," + + "com.arjuna.ats.internal.jta.recovery.arjunacore.JTANodeNameXAResourceOrphanFilter," + + "com.arjuna.ats.internal.jta.recovery.arjunacore.JTAActionStatusServiceXAResourceOrphanFilter") + public List xaResourceOrphanFilters; }