From 1d890a89523e853012acdd3d4d3d53406be6c760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Tue, 9 Jul 2024 14:13:54 +0200 Subject: [PATCH] Make coroutines with custom AOP aspects work with `@Transactional` Previous to this change, the transactional aspect would supersed the user-defined AspectJ aspect, shortcircuiting to calling the original Kotlin suspending function. This change simplifies the TransactionAspectSupport way of dealing with transactional coroutines, thanks to the fact that lower level support for AOP has been introduced in c8169e5c. Closes gh-33095 --- ...oProxyInterceptorKotlinIntegrationTests.kt | 61 ++++++++++++++++++- .../interceptor/TransactionAspectSupport.java | 43 +------------ .../interceptor/TransactionInterceptor.java | 16 +---- 3 files changed, 62 insertions(+), 58 deletions(-) diff --git a/integration-tests/src/test/kotlin/org/springframework/aop/framework/autoproxy/AspectJAutoProxyInterceptorKotlinIntegrationTests.kt b/integration-tests/src/test/kotlin/org/springframework/aop/framework/autoproxy/AspectJAutoProxyInterceptorKotlinIntegrationTests.kt index 4020eae298ee..8c40fe433e73 100644 --- a/integration-tests/src/test/kotlin/org/springframework/aop/framework/autoproxy/AspectJAutoProxyInterceptorKotlinIntegrationTests.kt +++ b/integration-tests/src/test/kotlin/org/springframework/aop/framework/autoproxy/AspectJAutoProxyInterceptorKotlinIntegrationTests.kt @@ -20,6 +20,9 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.aopalliance.intercept.MethodInterceptor import org.aopalliance.intercept.MethodInvocation +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.Around +import org.aspectj.lang.annotation.Aspect import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.InterceptorConfig @@ -28,10 +31,18 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.EnableAspectJAutoProxy +import org.springframework.stereotype.Component import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.junit.jupiter.SpringJUnitConfig +import org.springframework.transaction.annotation.EnableTransactionManagement +import org.springframework.transaction.annotation.Transactional +import org.springframework.transaction.testfixture.ReactiveCallCountingTransactionManager import reactor.core.publisher.Mono import java.lang.reflect.Method +import kotlin.annotation.AnnotationTarget.ANNOTATION_CLASS +import kotlin.annotation.AnnotationTarget.CLASS +import kotlin.annotation.AnnotationTarget.FUNCTION +import kotlin.annotation.AnnotationTarget.TYPE /** @@ -43,7 +54,9 @@ import java.lang.reflect.Method class AspectJAutoProxyInterceptorKotlinIntegrationTests( @Autowired val echo: Echo, @Autowired val firstAdvisor: TestPointcutAdvisor, - @Autowired val secondAdvisor: TestPointcutAdvisor) { + @Autowired val secondAdvisor: TestPointcutAdvisor, + @Autowired val countingAspect: CountingAspect, + @Autowired val reactiveTransactionManager: ReactiveCallCountingTransactionManager) { @Test fun `Multiple interceptors with regular function`() { @@ -67,8 +80,22 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests( assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) } } + @Test // gh-33095 + fun `Aspect and reactive transactional with suspending function`() { + assertThat(countingAspect.counter).isZero() + assertThat(reactiveTransactionManager.commits).isZero() + val value = "Hello!" + runBlocking { + assertThat(echo.suspendingTransactionalEcho(value)).isEqualTo(value) + } + assertThat(countingAspect.counter).`as`("aspect applied").isOne() + assertThat(reactiveTransactionManager.begun).isOne() + assertThat(reactiveTransactionManager.commits).`as`("transactional applied").isOne() + } + @Configuration @EnableAspectJAutoProxy + @EnableTransactionManagement open class InterceptorConfig { @Bean @@ -77,6 +104,13 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests( @Bean open fun secondAdvisor() = TestPointcutAdvisor().apply { order = 1 } + @Bean + open fun countingAspect() = CountingAspect() + + @Bean + open fun transactionManager(): ReactiveCallCountingTransactionManager { + return ReactiveCallCountingTransactionManager() + } @Bean open fun echo(): Echo { @@ -107,6 +141,24 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests( } } + @Target(CLASS, FUNCTION, ANNOTATION_CLASS, TYPE) + @Retention(AnnotationRetention.RUNTIME) + annotation class Counting() + + @Aspect + @Component + class CountingAspect { + + var counter: Long = 0 + + @Around("@annotation(org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.Counting)") + fun logging(joinPoint: ProceedingJoinPoint): Any { + return (joinPoint.proceed(joinPoint.args) as Mono<*>).doOnTerminate { + counter++ + } + } + } + open class Echo { open fun echo(value: String): String { @@ -118,6 +170,13 @@ class AspectJAutoProxyInterceptorKotlinIntegrationTests( return value } + @Transactional + @Counting + open suspend fun suspendingTransactionalEcho(value: String): String { + delay(1) + return value + } + } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index 08e39499b2e9..cbad1486d7a8 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -23,12 +23,8 @@ import java.util.concurrent.Future; import io.vavr.control.Try; -import kotlin.coroutines.Continuation; -import kotlin.coroutines.CoroutineContext; -import kotlinx.coroutines.Job; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -36,7 +32,6 @@ import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; -import org.springframework.core.CoroutinesUtils; import org.springframework.core.KotlinDetector; import org.springframework.core.MethodParameter; import org.springframework.core.NamedThreadLocal; @@ -355,10 +350,6 @@ protected Object invokeWithinTransaction(Method method, @Nullable Class targe boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method); boolean hasSuspendingFlowReturnType = isSuspendingFunction && COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()); - if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) { - throw new IllegalStateException("Coroutines invocation not supported: " + method); - } - CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null); ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> { Class reactiveType = @@ -371,11 +362,7 @@ protected Object invokeWithinTransaction(Method method, @Nullable Class targe return new ReactiveTransactionSupport(adapter); }); - InvocationCallback callback = invocation; - if (corInv != null) { - callback = () -> KotlinDelegate.invokeSuspendingFunction(method, corInv); - } - return txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, rtm); + return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, rtm); } PlatformTransactionManager ptm = asPlatformTransactionManager(tm); @@ -829,22 +816,6 @@ protected interface InvocationCallback { } - /** - * Coroutines-supporting extension of the callback interface. - */ - protected interface CoroutinesInvocationCallback extends InvocationCallback { - - Object getTarget(); - - Object[] getArguments(); - - default Object getContinuation() { - Object[] args = getArguments(); - return args[args.length - 1]; - } - } - - /** * Internal holder class for a Throwable in a callback transaction model. */ @@ -891,18 +862,6 @@ public static Object evaluateTryFailure(Object retVal, TransactionAttribute txAt } } - /** - * Inner class to avoid a hard dependency on Kotlin at runtime. - */ - private static class KotlinDelegate { - - public static Publisher invokeSuspendingFunction(Method method, CoroutinesInvocationCallback callback) { - CoroutineContext coroutineContext = ((Continuation) callback.getContinuation()).getContext().minusKey(Job.Key); - return CoroutinesUtils.invokeSuspendingFunction(coroutineContext, method, callback.getTarget(), callback.getArguments()); - } - - } - /** * Delegate for Reactor-based management of transactional methods with a diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java index 788c1f251994..863c23506cda 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionInterceptor.java @@ -116,21 +116,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable { Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... - return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() { - @Override - @Nullable - public Object proceedWithInvocation() throws Throwable { - return invocation.proceed(); - } - @Override - public Object getTarget() { - return invocation.getThis(); - } - @Override - public Object[] getArguments() { - return invocation.getArguments(); - } - }); + return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }