Skip to content

Commit

Permalink
Support Transaction Manager Recovery (#71)
Browse files Browse the repository at this point in the history
* Support Transaction Manager Recovery

- This registers the Managed Connection Factory to the Recovery service if enabled.
- Also added config options for the recovery credentials

* Also enable recovery on inflow transactions

* Introduce TransactionRecoveryManager

* Add TransactionRecoveryManager tests

* Apply code review suggestions
  • Loading branch information
gastaldi authored Nov 30, 2023
1 parent 3c657a3 commit 07ebc9a
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.jboss.jandex.Type;
import org.jboss.jca.core.api.connectionmanager.ccm.CachedConnectionManager;
import org.jboss.jca.core.connectionmanager.pool.mcp.SemaphoreArrayListManagedConnectionPool;
import org.jboss.jca.core.recovery.DefaultRecoveryPlugin;
import org.jboss.jca.core.spi.recovery.RecoveryPlugin;
import org.jboss.jca.core.spi.transaction.TransactionIntegration;
import org.jboss.jca.core.tx.jbossts.TransactionIntegrationImpl;

Expand All @@ -32,6 +34,7 @@
import io.quarkiverse.ironjacamar.runtime.IronJacamarRecorder;
import io.quarkiverse.ironjacamar.runtime.IronJacamarSupport;
import io.quarkiverse.ironjacamar.runtime.QuarkusIronJacamarLogger;
import io.quarkiverse.ironjacamar.runtime.TransactionRecoveryManager;
import io.quarkiverse.ironjacamar.runtime.security.QuarkusSecurityIntegration;
import io.quarkus.arc.BeanDestroyer;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
Expand All @@ -53,6 +56,7 @@
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.gizmo.MethodDescriptor;
import io.quarkus.narayana.jta.runtime.TransactionManagerConfiguration;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
import io.smallrye.common.annotation.Identifier;
Expand All @@ -71,7 +75,7 @@ FeatureBuildItem feature() {
@BuildStep
void additionalBeans(BuildProducer<AdditionalBeanBuildItem> additionalBeans) {
additionalBeans.produce(AdditionalBeanBuildItem.builder()
.addBeanClasses(TransactionIntegrationImpl.class)
.addBeanClasses(TransactionIntegrationImpl.class, DefaultRecoveryPlugin.class)
.setUnremovable()
.setDefaultScope(DotNames.SINGLETON)
.build());
Expand Down Expand Up @@ -169,10 +173,16 @@ void registerSingletonSyntheticBeans(BuildProducer<SyntheticBeanBuildItem> produ
})
.done());

producer.produce(SyntheticBeanBuildItem.configure(QuarkusSecurityIntegration.class)
.scope(BuiltinScope.DEPENDENT.getInfo())
// Produce TransactionRecoveryManager
producer.produce(SyntheticBeanBuildItem.configure(TransactionRecoveryManager.class)
.scope(BuiltinScope.SINGLETON.getInfo())
.setRuntimeInit()
.unremovable()
.createWith(recorder.createSecurityIntegration())
.addInjectionPoint(ClassType.create(DotName.createSimple(TransactionIntegration.class)))
.addInjectionPoint(ClassType.create(DotName.createSimple(TransactionManagerConfiguration.class)))
.addInjectionPoint(ClassType.create(DotName.createSimple(RecoveryPlugin.class)))
.createWith(recorder.createTransactionRecoveryManager())
.destroyer(BeanDestroyer.CloseableDestroyer.class)
.done());
}

Expand All @@ -185,6 +195,12 @@ void registerSyntheticBeans(
List<ResourceAdapterKindBuildItem> kinds,
BuildProducer<SyntheticBeanBuildItem> producer,
BuildProducer<ContainerCreatedBuildItem> createdProducer) {
// Register security integration as a dependent bean
producer.produce(SyntheticBeanBuildItem.configure(QuarkusSecurityIntegration.class)
.scope(BuiltinScope.DEPENDENT.getInfo())
.unremovable()
.createWith(recorder.createSecurityIntegration())
.done());
IndexView index = combinedIndexBuildItem.getIndex();
Type containerType = Type.create(DotName.createSimple(IronJacamarContainer.class), Type.Kind.CLASS);
var kindsMap = kinds.stream().collect(Collectors.toMap(ResourceAdapterKindBuildItem::getKind, Function.identity()));
Expand Down
57 changes: 57 additions & 0 deletions docs/modules/ROOT/pages/includes/quarkus-ironjacamar.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,63 @@ endif::add-copy-button-to-env-var[]
|`false`


a| [[quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.recovery.username]]`link:#quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.recovery.username[quarkus.ironjacamar.ra.cm.recovery.username]`

`link:#quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.recovery.username[quarkus.ironjacamar."resource-adapter-name".ra.cm.recovery.username]`


[.description]
--
The recovery username for the Connection Manager

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_IRONJACAMAR_RA_CM_RECOVERY_USERNAME+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_IRONJACAMAR_RA_CM_RECOVERY_USERNAME+++`
endif::add-copy-button-to-env-var[]
--|string
|


a| [[quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.recovery.password]]`link:#quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.recovery.password[quarkus.ironjacamar.ra.cm.recovery.password]`

`link:#quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.recovery.password[quarkus.ironjacamar."resource-adapter-name".ra.cm.recovery.password]`


[.description]
--
The recovery password for the Connection Manager

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_IRONJACAMAR_RA_CM_RECOVERY_PASSWORD+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_IRONJACAMAR_RA_CM_RECOVERY_PASSWORD+++`
endif::add-copy-button-to-env-var[]
--|string
|


a| [[quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.recovery.security-domain]]`link:#quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.recovery.security-domain[quarkus.ironjacamar.ra.cm.recovery.security-domain]`

`link:#quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.recovery.security-domain[quarkus.ironjacamar."resource-adapter-name".ra.cm.recovery.security-domain]`


[.description]
--
The recovery security domain for the Connection Manager

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_IRONJACAMAR_RA_CM_RECOVERY_SECURITY_DOMAIN+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_IRONJACAMAR_RA_CM_RECOVERY_SECURITY_DOMAIN+++`
endif::add-copy-button-to-env-var[]
--|string
|


a| [[quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.pool.strategy]]`link:#quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.pool.strategy[quarkus.ironjacamar.ra.cm.pool.strategy]`

`link:#quarkus-ironjacamar_quarkus.ironjacamar.ra.cm.pool.strategy[quarkus.ironjacamar."resource-adapter-name".ra.cm.pool.strategy]`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ quarkus.ironjacamar.ra.config.password=guest
#Enable pool metrics
quarkus.ironjacamar.metrics.enabled=true

# Enable recovery
quarkus.transaction-manager.enable-recovery=true

#Activation Configs
quarkus.ironjacamar.activation-spec.myqueue.config.destination-type=jakarta.jms.Queue
quarkus.ironjacamar.activation-spec.myqueue.config.destination=MyQueue
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.quarkiverse.ironjacamar.it;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.SoftAssertions.assertSoftly;

import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.jms.ConnectionFactory;

import org.junit.jupiter.api.Test;

import io.quarkiverse.ironjacamar.runtime.IronJacamarContainer;
import io.quarkiverse.ironjacamar.runtime.TransactionRecoveryManager;
import io.quarkus.arc.Arc;
import io.quarkus.test.junit.QuarkusTest;

Expand All @@ -17,6 +20,9 @@ public class InjectionTest {
@Inject
ConnectionFactory connectionFactory;

@Inject
Instance<TransactionRecoveryManager> transactionRecoveryManagerInstance;

@Test
public void testProducer() {
assertThat(Arc.container().listAll(IronJacamarContainer.class)).hasSize(1);
Expand All @@ -28,4 +34,15 @@ public void shouldInjectConnectionFactory() {
assertThat(connectionFactory).isNotNull();
}

@Test
public void shouldInjectTransactionRecoveryManager() {
assertSoftly(softly -> {
softly.assertThat(Arc.container().listAll(TransactionRecoveryManager.class)).hasSize(1);
softly.assertThat(transactionRecoveryManagerInstance).isNotNull();
softly.assertThat(transactionRecoveryManagerInstance.isResolvable()).isTrue();
softly.assertThat(transactionRecoveryManagerInstance.get()).isNotNull();
softly.assertThat(transactionRecoveryManagerInstance.get().isEnabled()).isTrue();
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,27 @@
*/
public interface ResourceAdapterFactory {

/**
* The product name of the resource adapter.
*/
default String getProductName() {
return toString();
}

/**
* The product version of the resource adapter.
*/
default String getProductVersion() {
return "1.0";
}

/**
* A human-readable description of the resource adapter.
*
* @return the description that is displayed in the logs
*/
default String getDescription() {
return toString();
return getProductName() + " " + getProductVersion();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@
import org.jboss.jca.core.connectionmanager.pool.api.Pool;
import org.jboss.jca.core.connectionmanager.pool.api.PoolFactory;
import org.jboss.jca.core.connectionmanager.pool.mcp.ManagedConnectionPoolFactory;
import org.jboss.jca.core.spi.recovery.RecoveryPlugin;
import org.jboss.jca.core.spi.transaction.TransactionIntegration;

@Dependent
public class ConnectionManagerFactory {

private final TransactionIntegration transactionIntegration;

private final CachedConnectionManager ccm;
private final RecoveryPlugin recoveryPlugin;

@Inject
public ConnectionManagerFactory(TransactionIntegration transactionIntegration, CachedConnectionManager ccm) {
public ConnectionManagerFactory(TransactionIntegration transactionIntegration, CachedConnectionManager ccm,
RecoveryPlugin recoveryPlugin) {
this.transactionIntegration = transactionIntegration;
this.ccm = ccm;
this.recoveryPlugin = recoveryPlugin;
}

public ConnectionManager createConnectionManager(String id, ManagedConnectionFactory mcf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ public class IronJacamarContainer implements Closeable {
private final ResourceAdapter resourceAdapter;
private final ManagedConnectionFactory managedConnectionFactory;
private final ConnectionManager connectionManager;
private final TransactionRecoveryManager transactionRecoveryManager;

public IronJacamarContainer(ResourceAdapterFactory resourceAdapterFactory,
ResourceAdapter resourceAdapter,
ManagedConnectionFactory managedConnectionFactory,
ConnectionManager connectionManager) {
ConnectionManager connectionManager,
TransactionRecoveryManager transactionRecoveryManager) {
this.resourceAdapterFactory = resourceAdapterFactory;
this.resourceAdapter = resourceAdapter;
this.managedConnectionFactory = managedConnectionFactory;
this.connectionManager = connectionManager;
this.transactionRecoveryManager = transactionRecoveryManager;
}

public ResourceAdapter getResourceAdapter() {
Expand All @@ -56,6 +59,10 @@ public void endpointActivation(Class<?> endpointClass, String identifier, Map<St
DefaultMessageEndpointFactory messageEndpointFactory = new DefaultMessageEndpointFactory(endpointClass, identifier,
resourceAdapterFactory);
resourceAdapter.endpointActivation(messageEndpointFactory, activationSpec);
if (transactionRecoveryManager.isEnabled()) {
transactionRecoveryManager.registerForRecovery(resourceAdapter, activationSpec,
resourceAdapterFactory.getProductName(), resourceAdapterFactory.getProductVersion());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.jboss.jca.core.bootstrapcontext.BaseCloneableBootstrapContext;
import org.jboss.jca.core.bootstrapcontext.BootstrapContextCoordinator;
import org.jboss.jca.core.connectionmanager.ccm.CachedConnectionManagerImpl;
import org.jboss.jca.core.spi.recovery.RecoveryPlugin;
import org.jboss.jca.core.spi.security.SecurityIntegration;
import org.jboss.jca.core.spi.transaction.TransactionIntegration;
import org.jboss.jca.core.workmanager.WorkManagerCoordinator;
Expand All @@ -24,6 +25,7 @@
import io.quarkiverse.ironjacamar.runtime.security.QuarkusSecurityIntegration;
import io.quarkus.arc.SyntheticCreationalContext;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.narayana.jta.runtime.TransactionManagerConfiguration;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import io.smallrye.common.annotation.Identifier;
Expand Down Expand Up @@ -68,6 +70,15 @@ public Function<SyntheticCreationalContext<QuarkusSecurityIntegration>, QuarkusS
return context -> new QuarkusSecurityIntegration();
}

public Function<SyntheticCreationalContext<TransactionRecoveryManager>, TransactionRecoveryManager> createTransactionRecoveryManager() {
return context -> {
TransactionIntegration ti = context.getInjectedReference(TransactionIntegration.class);
TransactionManagerConfiguration tmConfig = context.getInjectedReference(TransactionManagerConfiguration.class);
RecoveryPlugin recoveryPlugin = context.getInjectedReference(RecoveryPlugin.class);
return new TransactionRecoveryManager(ti, recoveryPlugin, tmConfig.enableRecovery);
};
}

public void initDefaultBootstrapContext(BeanContainer beanContainer) {
TransactionIntegration transactionIntegration = beanContainer.beanInstance(TransactionIntegration.class);
SecurityIntegration securityIntegration = beanContainer.beanInstance(QuarkusSecurityIntegration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,29 @@ interface ConnectionManagerConfig {
@WithDefault("false")
boolean padXid();

/**
* The recovery configuration for the Connection Manager
*/
RecoveryConfig recovery();

@ConfigGroup
interface RecoveryConfig {
/**
* The recovery username for the Connection Manager
*/
Optional<String> username();

/**
* The recovery password for the Connection Manager
*/
Optional<String> password();

/**
* The recovery security domain for the Connection Manager
*/
Optional<String> securityDomain();
}

/**
* The pool configuration for the Connection Manager
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.DeploymentException;
import jakarta.inject.Inject;
import jakarta.resource.ResourceException;
import jakarta.resource.spi.ManagedConnectionFactory;
import jakarta.resource.spi.ResourceAdapter;

import org.jboss.jca.core.connectionmanager.ConnectionManager;
import org.jboss.jca.core.connectionmanager.TxConnectionManager;

import io.quarkiverse.ironjacamar.ResourceAdapterFactory;
import io.quarkiverse.ironjacamar.ResourceAdapterKind;
Expand All @@ -34,6 +34,9 @@ public class IronJacamarSupport {
@Inject
ConnectionManagerFactory connectionManagerFactory;

@Inject
TransactionRecoveryManager transactionRecoveryManager;

@Inject
@Any
Instance<IronJacamarContainer> containers;
Expand All @@ -48,17 +51,29 @@ public IronJacamarContainer createContainer(String id, String kind) {
var adapterRuntimeConfig = runtimeConfig.resourceAdapters().get(id);
ResourceAdapter resourceAdapter;
ManagedConnectionFactory managedConnectionFactory;
ConnectionManager connectionManager;
IronJacamarRuntimeConfig.ResourceAdapterConfig ra = adapterRuntimeConfig.ra();
try {
resourceAdapter = resourceAdapterFactory.createResourceAdapter(id, ra.config());
managedConnectionFactory = resourceAdapterFactory.createManagedConnectionFactory(id, resourceAdapter);
connectionManager = connectionManagerFactory.createConnectionManager(id, managedConnectionFactory, ra.cm());
// Register recovery if enabled
if (transactionRecoveryManager.isEnabled()) {
if (connectionManager instanceof TxConnectionManager) {
transactionRecoveryManager.registerForRecovery(managedConnectionFactory,
(TxConnectionManager) connectionManager,
ra.cm().recovery().username().orElse(null),
ra.cm().recovery().password().orElse(null),
ra.cm().recovery().securityDomain().orElse(null));
} else {
QuarkusIronJacamarLogger.log.connectionManagerNotTransactional(id);
}
}
} catch (ResourceException re) {
throw new DeploymentException("Cannot deploy resource adapter", re);
throw QuarkusIronJacamarLogger.log.cannotDeployResourceAdapter(re);
}
ConnectionManager connectionManager = connectionManagerFactory.createConnectionManager(id, managedConnectionFactory,
ra.cm());
return new IronJacamarContainer(resourceAdapterFactory, resourceAdapter, managedConnectionFactory,
connectionManager);
connectionManager, transactionRecoveryManager);
}

public void activateEndpoint(String containerId, String activationSpecConfigId, String endpointClassName,
Expand All @@ -80,7 +95,7 @@ public void activateEndpoint(String containerId, String activationSpecConfigId,
try {
ijContainer.endpointActivation(endpointClass, containerId, config);
} catch (ResourceException e) {
throw new RuntimeException(e);
throw QuarkusIronJacamarLogger.log.cannotActivateEndpoint(e);
}
}
}
Loading

0 comments on commit 07ebc9a

Please sign in to comment.