Skip to content

Commit

Permalink
Fix deprecations around ListenableFuture (#3865)
Browse files Browse the repository at this point in the history
* Fix deprecations around ListenableFuture

SF has deprecated a `ListenableFuture` and API around it

* Migrate to `CompletableFuture` everywhere a `ListenableFuture` has been used
* Suppress a deprecation for `ListenableFuture` keeping the functionality until the next version
* Resolve deprecations nad removals from the latest Spring for Apache Kafka
* Fix documentation for the `ListenableFuture` in favor of `CompletableFuture`

NOTE: the AMQP module is left as is until `ListenableFuture` deprecation is resolved in Spring AMQP

* * Restore some `ListenableFuture` test for messaging gateway
  • Loading branch information
artembilan authored Jul 28, 2022
1 parent 2e9bead commit 5572c21
Show file tree
Hide file tree
Showing 23 changed files with 196 additions and 259 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
Expand All @@ -49,7 +48,6 @@
import org.springframework.beans.factory.FactoryBean;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.support.TaskExecutorAdapter;
Expand Down Expand Up @@ -145,10 +143,6 @@ public class GatewayProxyFactoryBean extends AbstractEndpoint

private boolean asyncExecutorExplicitlySet;

private Class<?> asyncSubmitType;

private Class<?> asyncSubmitListenableType;

private volatile boolean initialized;

private Map<String, GatewayMethodMetadata> methodMetadataMap;
Expand Down Expand Up @@ -468,15 +462,6 @@ protected void onInit() {
new ProxyFactory(this.serviceInterface, this);
gatewayProxyFactory.addAdvice(new DefaultMethodInvokingMethodInterceptor());
this.serviceProxy = gatewayProxyFactory.getProxy(this.beanClassLoader);
if (this.asyncExecutor != null) {
Callable<String> task = () -> null;
Future<String> submitType = this.asyncExecutor.submit(task);
this.asyncSubmitType = submitType.getClass();
if (this.asyncExecutor instanceof AsyncListenableTaskExecutor) {
submitType = ((AsyncListenableTaskExecutor) this.asyncExecutor).submitListenable(task);
this.asyncSubmitListenableType = submitType.getClass();
}
}
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(beanFactory);
this.initialized = true;
}
Expand Down Expand Up @@ -511,6 +496,7 @@ public Object getObject() {

@Override
@Nullable
@SuppressWarnings("deprecation")
public Object invoke(final MethodInvocation invocation) throws Throwable { // NOSONAR
final Class<?> returnType;
MethodInvocationGateway gateway = this.gatewayMap.get(invocation.getMethod());
Expand All @@ -522,15 +508,16 @@ public Object invoke(final MethodInvocation invocation) throws Throwable { // NO
}
if (this.asyncExecutor != null && !Object.class.equals(returnType)) {
Invoker invoker = new Invoker(invocation);
if (returnType.isAssignableFrom(this.asyncSubmitType)) {
if (Future.class.equals(returnType)) {
return this.asyncExecutor.submit(invoker::get);
}
else if (returnType.isAssignableFrom(this.asyncSubmitListenableType)) {
return ((AsyncListenableTaskExecutor) this.asyncExecutor).submitListenable(invoker::get);
}
else if (CompletableFuture.class.equals(returnType)) { // exact
return CompletableFuture.supplyAsync(invoker, this.asyncExecutor);
}
else if (org.springframework.util.concurrent.ListenableFuture.class.equals(returnType)) {
return ((org.springframework.core.task.AsyncListenableTaskExecutor) this.asyncExecutor)
.submitListenable(invoker::get);
}
else if (Future.class.isAssignableFrom(returnType)) {
logger.debug(() -> "AsyncTaskExecutor submit*() return types are incompatible with the method return " +
"type; running on calling thread; the downstream flow must return the required Future: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -106,9 +105,9 @@ public void setOutputChannelName(String outputChannelName) {
}

/**
* Allow async replies. If the handler reply is a {@link ListenableFuture}, send
* the output when it is satisfied rather than sending the future as the result.
* Ignored for return types other than {@link ListenableFuture}.
* Allow async replies. If the handler reply is a {@link CompletableFuture} or {@link Publisher},
* send the output when it is satisfied rather than sending the future as the result.
* Ignored for return types other than {@link CompletableFuture} or {@link Publisher}.
* @param async true to allow.
* @since 4.3
*/
Expand Down Expand Up @@ -299,6 +298,7 @@ else if (reply instanceof AbstractIntegrationMessageBuilder<?>) {
return replyChannel;
}

@SuppressWarnings("deprecation")
private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHeaders, Object reply,
@Nullable Object replyChannelArg) {

Expand All @@ -307,7 +307,7 @@ private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHe
replyChannel = getOutputChannel();
}

if (this.async && (reply instanceof ListenableFuture<?>
if (this.async && (reply instanceof org.springframework.util.concurrent.ListenableFuture<?>
|| reply instanceof CompletableFuture<?>
|| reply instanceof Publisher<?>)) {

Expand Down Expand Up @@ -351,13 +351,14 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) {
return builder;
}

@SuppressWarnings("deprecation")
private void asyncNonReactiveReply(Message<?> requestMessage, Object reply, @Nullable Object replyChannel) {
CompletableFuture<?> future;
if (reply instanceof CompletableFuture<?>) {
future = (CompletableFuture<?>) reply;
}
else if (reply instanceof ListenableFuture<?>) {
future = ((ListenableFuture<?>) reply).completable();
else if (reply instanceof org.springframework.util.concurrent.ListenableFuture<?>) {
future = ((org.springframework.util.concurrent.ListenableFuture<?>) reply).completable();
}
else {
Mono<?> reactiveReply;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,10 +17,12 @@
package org.springframework.integration.config.annotation;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -33,9 +35,7 @@
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Dave Syer
Expand All @@ -44,7 +44,7 @@
* @author Artem Bilan
* @author Yilin Wei
*/
@RunWith(SpringRunner.class)
@SpringJUnitConfig
@DirtiesContext
public class AnnotatedEndpointActivationTests {

Expand Down Expand Up @@ -72,7 +72,7 @@ public class AnnotatedEndpointActivationTests {
// them will get the message.
private static volatile int count = 0;

@Before
@BeforeEach
public void resetCount() {
count = 0;
}
Expand Down Expand Up @@ -108,11 +108,12 @@ public void sendAndReceiveImplicitInputChannel() {
assertThat(count).isEqualTo(1);
}

@Test(expected = MessageDeliveryException.class)
@Test
@DirtiesContext
public void stopContext() {
applicationContext.stop();
this.input.send(new GenericMessage<>("foo"));
assertThatExceptionOfType(MessageDeliveryException.class)
.isThrownBy(() -> this.input.send(new GenericMessage<>("foo")));
}

@Test
Expand Down Expand Up @@ -159,9 +160,9 @@ public String process(String message) {
private static class AnnotatedEndpoint3 {

@ServiceActivator(inputChannel = "inputAsync", outputChannel = "outputAsync", async = "true")
public ListenableFuture<String> process(String message) {
SettableListenableFuture<String> future = new SettableListenableFuture<>();
future.set(message);
public CompletableFuture<String> process(String message) {
CompletableFuture<String> future = new CompletableFuture<>();
future.complete(message);
return future;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,7 +59,6 @@
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

Expand Down Expand Up @@ -403,7 +402,7 @@ private void startResponder(final PollableChannel requestChannel, final MessageC
.setCorrelationId(request.getHeaders().getId()).build();
Object payload = null;
if (request.getPayload().equals("futureSync")) {
payload = new AsyncResult<Message<?>>(reply);
payload = CompletableFuture.completedFuture(reply);
}
else if (request.getPayload().equals("flowCompletable")) {
payload = CompletableFuture.<String>completedFuture("SYNC_COMPLETABLE");
Expand Down Expand Up @@ -448,7 +447,7 @@ public void setBeanName(String beanName) {
}

@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings("unchecked")
public <T> Future<T> submit(Callable<T> task) {
try {
Future<?> result = super.submit(task);
Expand All @@ -462,7 +461,8 @@ public <T> Future<T> submit(Callable<T> task) {
modifiedMessage = MessageBuilder.fromMessage(message)
.setHeader("executor", this.beanName).build();
}
return new AsyncResult(modifiedMessage);

return (Future<T>) CompletableFuture.completedFuture(modifiedMessage);
}
catch (Exception e) {
throw new IllegalStateException("unexpected exception in testExecutor", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -39,8 +40,6 @@
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -105,22 +104,15 @@ public void listenableFutureWithMessageReturned() throws Exception {
proxyFactory.setBeanFactory(mock(BeanFactory.class));
proxyFactory.afterPropertiesSet();
TestEchoService service = (TestEchoService) proxyFactory.getObject();
ListenableFuture<Message<?>> f = service.returnMessageListenable("foo");
CompletableFuture<Message<?>> f = service.returnMessageListenable("foo");
long start = System.currentTimeMillis();
final AtomicReference<Message<?>> result = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
f.addCallback(new ListenableFutureCallback<Message<?>>() {

@Override
public void onSuccess(Message<?> msg) {
result.set(msg);
f.whenComplete((message, throwable) -> {
if (throwable == null) {
result.set(message);
latch.countDown();
}

@Override
public void onFailure(Throwable t) {
}

});
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
long elapsed = System.currentTimeMillis() - start;
Expand Down Expand Up @@ -300,7 +292,7 @@ private static void startResponder(final PollableChannel requestChannel) {
String header = (String) input.getHeaders().get("method");
if (header != null && header.startsWith("returnCustomFuture")) {
reply = MessageBuilder.withPayload(new CustomFuture(payload,
(Thread) input.getHeaders().get("thread")))
(Thread) input.getHeaders().get("thread")))
.copyHeaders(input.getHeaders())
.build();
}
Expand All @@ -317,7 +309,7 @@ private interface TestEchoService {

Future<?> returnSomething(String s);

ListenableFuture<Message<?>> returnMessageListenable(String s);
CompletableFuture<Message<?>> returnMessageListenable(String s);

@Gateway(headers = @GatewayHeader(name = "method", expression = "#gatewayMethod.name"))
CustomFuture returnCustomFuture(String s);
Expand Down
Loading

0 comments on commit 5572c21

Please sign in to comment.