Skip to content

Commit

Permalink
Improve Narayana recovery manager service and integrate it with agroal
Browse files Browse the repository at this point in the history
  • Loading branch information
zhfeng committed Sep 9, 2022
1 parent 382511f commit 52c8f4d
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Consume;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
Expand All @@ -48,6 +49,7 @@
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.pkg.builditem.CurateOutcomeBuildItem;
import io.quarkus.narayana.jta.deployment.NarayanaInitBuildItem;
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;

Expand Down Expand Up @@ -221,6 +223,7 @@ void generateDataSourceSupportBean(AgroalRecorder recorder,

@Record(ExecutionTime.RUNTIME_INIT)
@BuildStep
@Consume(NarayanaInitBuildItem.class)
void generateDataSourceBeans(AgroalRecorder recorder,
DataSourcesRuntimeConfig dataSourcesRuntimeConfig,
List<AggregatedDataSourceBuildTimeConfigBuildItem> aggregatedBuildTimeConfigBuildItems,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.transaction.TransactionSynchronizationRegistry;

import org.jboss.logging.Logger;
import org.jboss.tm.XAResourceRecoveryRegistry;

import io.agroal.api.AgroalDataSource;
import io.agroal.api.AgroalPoolInterceptor;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class DataSources {
private final DataSourcesJdbcBuildTimeConfig dataSourcesJdbcBuildTimeConfig;
private final DataSourcesJdbcRuntimeConfig dataSourcesJdbcRuntimeConfig;
private final TransactionManager transactionManager;
private final XAResourceRecoveryRegistry xaResourceRecoveryRegistry;
private final TransactionSynchronizationRegistry transactionSynchronizationRegistry;
private final DataSourceSupport dataSourceSupport;
private final Instance<AgroalPoolInterceptor> agroalPoolInterceptors;
Expand All @@ -82,13 +84,15 @@ public DataSources(DataSourcesBuildTimeConfig dataSourcesBuildTimeConfig,
DataSourcesRuntimeConfig dataSourcesRuntimeConfig, DataSourcesJdbcBuildTimeConfig dataSourcesJdbcBuildTimeConfig,
DataSourcesJdbcRuntimeConfig dataSourcesJdbcRuntimeConfig,
TransactionManager transactionManager,
XAResourceRecoveryRegistry xaResourceRecoveryRegistry,
TransactionSynchronizationRegistry transactionSynchronizationRegistry, DataSourceSupport dataSourceSupport,
@Any Instance<AgroalPoolInterceptor> agroalPoolInterceptors) {
this.dataSourcesBuildTimeConfig = dataSourcesBuildTimeConfig;
this.dataSourcesRuntimeConfig = dataSourcesRuntimeConfig;
this.dataSourcesJdbcBuildTimeConfig = dataSourcesJdbcBuildTimeConfig;
this.dataSourcesJdbcRuntimeConfig = dataSourcesJdbcRuntimeConfig;
this.transactionManager = transactionManager;
this.xaResourceRecoveryRegistry = xaResourceRecoveryRegistry;
this.transactionSynchronizationRegistry = transactionSynchronizationRegistry;
this.dataSourceSupport = dataSourceSupport;
this.agroalPoolInterceptors = agroalPoolInterceptors;
Expand Down Expand Up @@ -268,7 +272,10 @@ private void applyNewConfiguration(AgroalDataSourceConfigurationSupplier dataSou

if (dataSourceJdbcBuildTimeConfig.transactions != io.quarkus.agroal.runtime.TransactionIntegration.DISABLED) {
TransactionIntegration txIntegration = new NarayanaTransactionIntegration(transactionManager,
transactionSynchronizationRegistry);
transactionSynchronizationRegistry, null, false,
dataSourceJdbcBuildTimeConfig.transactions == io.quarkus.agroal.runtime.TransactionIntegration.XA
? xaResourceRecoveryRegistry
: null);
poolConfiguration.transactionIntegration(txIntegration);
}

Expand All @@ -287,12 +294,14 @@ private void applyNewConfiguration(AgroalDataSourceConfigurationSupplier dataSou

// Authentication
if (dataSourceRuntimeConfig.username.isPresent()) {
NamePrincipal username = new NamePrincipal(dataSourceRuntimeConfig.username.get());
connectionFactoryConfiguration
.principal(new NamePrincipal(dataSourceRuntimeConfig.username.get()));
.principal(username).recoveryPrincipal(username);
}
if (dataSourceRuntimeConfig.password.isPresent()) {
SimplePassword password = new SimplePassword(dataSourceRuntimeConfig.password.get());
connectionFactoryConfiguration
.credential(new SimplePassword(dataSourceRuntimeConfig.password.get()));
.credential(password).recoveryCredential(password);
}

// credentials provider
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.quarkus.narayana.jta.deployment;

import io.quarkus.builder.item.EmptyBuildItem;

/**
* Marker build item that indicates that the Narayana JTA extension has been initialized.
*/
public final class NarayanaInitBuildItem extends EmptyBuildItem {
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,20 @@
import javax.transaction.TransactionScoped;

import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean;
import com.arjuna.ats.arjuna.recovery.TransactionStatusConnectionManager;
import com.arjuna.ats.internal.arjuna.coordinator.CheckedActionFactoryImple;
import com.arjuna.ats.internal.arjuna.objectstore.ShadowNoFileLockStore;
import com.arjuna.ats.internal.arjuna.recovery.AtomicActionExpiryScanner;
import com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule;
import com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner;
import com.arjuna.ats.internal.arjuna.utils.SocketProcessId;
import com.arjuna.ats.internal.jta.recovery.arjunacore.CommitMarkableResourceRecordRecoveryModule;
import com.arjuna.ats.internal.jta.recovery.arjunacore.JTAActionStatusServiceXAResourceOrphanFilter;
import com.arjuna.ats.internal.jta.recovery.arjunacore.JTANodeNameXAResourceOrphanFilter;
import com.arjuna.ats.internal.jta.recovery.arjunacore.JTATransactionLogXAResourceOrphanFilter;
import com.arjuna.ats.internal.jta.recovery.arjunacore.RecoverConnectableAtomicAction;
import com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule;
import com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord;
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple;
import com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple;
Expand All @@ -32,6 +41,7 @@
import io.quarkus.deployment.IsTest;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Produce;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
Expand Down Expand Up @@ -64,6 +74,7 @@ public NativeImageSystemPropertyBuildItem nativeImageSystemPropertyBuildItem() {

@BuildStep
@Record(RUNTIME_INIT)
@Produce(NarayanaInitBuildItem.class)
public void build(NarayanaJtaRecorder recorder,
BuildProducer<AdditionalBeanBuildItem> additionalBeans,
BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
Expand All @@ -81,6 +92,9 @@ public void build(NarayanaJtaRecorder recorder,
runtimeInit.produce(new RuntimeInitializedClassBuildItem(SocketProcessId.class.getName()));
runtimeInit.produce(new RuntimeInitializedClassBuildItem(CommitMarkableResourceRecordRecoveryModule.class.getName()));
runtimeInit.produce(new RuntimeInitializedClassBuildItem(RecoverConnectableAtomicAction.class.getName()));
runtimeInit.produce(new RuntimeInitializedClassBuildItem(TransactionStatusConnectionManager.class.getName()));
runtimeInit.produce(new RuntimeInitializedClassBuildItem(JTAActionStatusServiceXAResourceOrphanFilter.class.getName()));
runtimeInit.produce(new RuntimeInitializedClassBuildItem(AtomicActionExpiryScanner.class.getName()));

reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, JTAEnvironmentBean.class.getName(),
UserTransactionImple.class.getName(),
Expand All @@ -89,7 +103,14 @@ public void build(NarayanaJtaRecorder recorder,
TransactionSynchronizationRegistryImple.class.getName(),
ObjectStoreEnvironmentBean.class.getName(),
ShadowNoFileLockStore.class.getName(),
SocketProcessId.class.getName()));
SocketProcessId.class.getName(),
AtomicActionRecoveryModule.class.getName(),
XARecoveryModule.class.getName(),
XAResourceRecord.class.getName(),
JTATransactionLogXAResourceOrphanFilter.class.getName(),
JTANodeNameXAResourceOrphanFilter.class.getName(),
JTAActionStatusServiceXAResourceOrphanFilter.class.getName(),
ExpiredTransactionStatusManagerScanner.class.getName()));

AdditionalBeanBuildItem.Builder builder = AdditionalBeanBuildItem.builder();
builder.addBeanClass(TransactionalInterceptorSupports.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Produces;
import javax.inject.Singleton;
import javax.transaction.TransactionSynchronizationRegistry;

import org.jboss.tm.JBossXATerminator;
Expand Down Expand Up @@ -32,9 +33,12 @@ public javax.transaction.UserTransaction userTransaction() {
}

@Produces
@ApplicationScoped
@Singleton
public XAResourceRecoveryRegistry xaResourceRecoveryRegistry() {
return new RecoveryManagerService();
RecoveryManagerService recoveryManagerService = new RecoveryManagerService();
recoveryManagerService.create();
recoveryManagerService.start();
return recoveryManagerService;
}

@Produces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
import org.jboss.logging.Logger;

import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean;
import com.arjuna.ats.arjuna.common.RecoveryEnvironmentBean;
import com.arjuna.ats.arjuna.common.arjPropertyManager;
import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
import com.arjuna.ats.jta.common.JTAEnvironmentBean;
import com.arjuna.ats.jta.common.jtaPropertyManager;
import com.arjuna.common.internal.util.propertyservice.BeanPopulator;
import com.arjuna.common.util.propertyservice.PropertiesFactory;

import io.quarkus.runtime.ShutdownContext;
Expand Down Expand Up @@ -67,13 +72,25 @@ public void disableTransactionStatusManager() {
}

public void setConfig(final TransactionManagerConfiguration transactions) {
arjPropertyManager.getObjectStoreEnvironmentBean().setObjectStoreDir(transactions.objectStoreDirectory);
BeanPopulator.getDefaultInstance(ObjectStoreEnvironmentBean.class)
.setObjectStoreDir(transactions.objectStoreDirectory);
BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, "communicationStore")
.setObjectStoreDir(transactions.objectStoreDirectory);
BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, "stateStore")
.setObjectStoreDir(transactions.objectStoreDirectory);
BeanPopulator.getDefaultInstance(RecoveryEnvironmentBean.class)
.setRecoveryModuleClassNames(transactions.recoveryModules);
BeanPopulator.getDefaultInstance(RecoveryEnvironmentBean.class)
.setExpiryScannerClassNames(transactions.expiryScanners);
BeanPopulator.getDefaultInstance(JTAEnvironmentBean.class)
.setXaResourceOrphanFilterClassNames(transactions.xaResourceOrphanFilters);
}

public void handleShutdown(ShutdownContext context) {
context.addLastShutdownTask(new Runnable() {
@Override
public void run() {
RecoveryManager.manager().terminate(true);
TransactionReaper.terminate(false);
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.narayana.jta.runtime;

import java.time.Duration;
import java.util.List;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
Expand Down Expand Up @@ -30,4 +31,25 @@ public final class TransactionManagerConfiguration {
*/
@ConfigItem(defaultValue = "ObjectStore")
public String objectStoreDirectory;

/**
* The list of recovery modules
*/
@ConfigItem(defaultValue = "com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule," +
"com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule")
public List<String> recoveryModules;

/**
* The list of expiry scanners
*/
@ConfigItem(defaultValue = "com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner")
public List<String> expiryScanners;

/**
* The list of orphan filters
*/
@ConfigItem(defaultValue = "com.arjuna.ats.internal.jta.recovery.arjunacore.JTATransactionLogXAResourceOrphanFilter," +
"com.arjuna.ats.internal.jta.recovery.arjunacore.JTANodeNameXAResourceOrphanFilter," +
"com.arjuna.ats.internal.jta.recovery.arjunacore.JTAActionStatusServiceXAResourceOrphanFilter")
public List<String> xaResourceOrphanFilters;
}

0 comments on commit 52c8f4d

Please sign in to comment.