Skip to content

Commit

Permalink
spring-projectsGH-1473: Switch to CompletableFuture
Browse files Browse the repository at this point in the history
Resolves spring-projects#1473

Given the stability of the project, it was simplest to copy the `AsyncRabbitTemplate`
rather than adding a lot of conditional code.

**2.4.x only; I will submit a separate PR for main**

spring-projectsGH-1473: Fix Unused Import

spring-projectsGH-1473: Move RabbitFutures to Top Level Classes

- to aid migration from 2.4.x to 3.0.x so that the return types will not change

* Increase test coverage.
* Fix since.
* Add author to new files.

Remove async template; rename async template2.
  • Loading branch information
garyrussell committed Jul 28, 2022
1 parent 8becb89 commit 1c2b5b9
Show file tree
Hide file tree
Showing 23 changed files with 590 additions and 322 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,15 +16,16 @@

package org.springframework.amqp.core;

import java.util.concurrent.CompletableFuture;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.util.concurrent.ListenableFuture;

/**
* Classes implementing this interface can perform asynchronous send and
* receive operations.
* receive operations using {@link CompletableFuture}s.
*
* @author Gary Russell
* @since 2.0
* @since 2.4.7
*
*/
public interface AsyncAmqpTemplate {
Expand All @@ -33,47 +34,47 @@ public interface AsyncAmqpTemplate {
* Send a message to the default exchange with the default routing key. If the message
* contains a correlationId property, it must be unique.
* @param message the message.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
ListenableFuture<Message> sendAndReceive(Message message);
CompletableFuture<Message> sendAndReceive(Message message);

/**
* Send a message to the default exchange with the supplied routing key. If the message
* contains a correlationId property, it must be unique.
* @param routingKey the routing key.
* @param message the message.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
ListenableFuture<Message> sendAndReceive(String routingKey, Message message);
CompletableFuture<Message> sendAndReceive(String routingKey, Message message);

/**
* Send a message to the supplied exchange and routing key. If the message
* contains a correlationId property, it must be unique.
* @param exchange the exchange.
* @param routingKey the routing key.
* @param message the message.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
ListenableFuture<Message> sendAndReceive(String exchange, String routingKey, Message message);
CompletableFuture<Message> sendAndReceive(String exchange, String routingKey, Message message);

/**
* Convert the object to a message and send it to the default exchange with the
* default routing key.
* @param object the object to convert.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceive(Object object);
<C> CompletableFuture<C> convertSendAndReceive(Object object);

/**
* Convert the object to a message and send it to the default exchange with the
* provided routing key.
* @param routingKey the routing key.
* @param object the object to convert.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceive(String routingKey, Object object);
<C> CompletableFuture<C> convertSendAndReceive(String routingKey, Object object);

/**
* Convert the object to a message and send it to the provided exchange and
Expand All @@ -82,9 +83,9 @@ public interface AsyncAmqpTemplate {
* @param routingKey the routing key.
* @param object the object to convert.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object);
<C> CompletableFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object);

/**
* Convert the object to a message and send it to the default exchange with the
Expand All @@ -93,9 +94,9 @@ public interface AsyncAmqpTemplate {
* @param object the object to convert.
* @param messagePostProcessor the post processor.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceive(Object object, MessagePostProcessor messagePostProcessor);
<C> CompletableFuture<C> convertSendAndReceive(Object object, MessagePostProcessor messagePostProcessor);

/**
* Convert the object to a message and send it to the default exchange with the
Expand All @@ -105,9 +106,9 @@ public interface AsyncAmqpTemplate {
* @param object the object to convert.
* @param messagePostProcessor the post processor.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceive(String routingKey, Object object,
<C> CompletableFuture<C> convertSendAndReceive(String routingKey, Object object,
MessagePostProcessor messagePostProcessor);

/**
Expand All @@ -119,9 +120,9 @@ <C> ListenableFuture<C> convertSendAndReceive(String routingKey, Object object,
* @param object the object to convert.
* @param messagePostProcessor the post processor.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object,
<C> CompletableFuture<C> convertSendAndReceive(String exchange, String routingKey, Object object,
MessagePostProcessor messagePostProcessor);

/**
Expand All @@ -130,9 +131,9 @@ <C> ListenableFuture<C> convertSendAndReceive(String exchange, String routingKey
* @param object the object to convert.
* @param responseType the response type.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceiveAsType(Object object, ParameterizedTypeReference<C> responseType);
<C> CompletableFuture<C> convertSendAndReceiveAsType(Object object, ParameterizedTypeReference<C> responseType);

/**
* Convert the object to a message and send it to the default exchange with the
Expand All @@ -141,9 +142,9 @@ <C> ListenableFuture<C> convertSendAndReceive(String exchange, String routingKey
* @param object the object to convert.
* @param responseType the response type.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceiveAsType(String routingKey, Object object,
<C> CompletableFuture<C> convertSendAndReceiveAsType(String routingKey, Object object,
ParameterizedTypeReference<C> responseType);

/**
Expand All @@ -154,9 +155,9 @@ <C> ListenableFuture<C> convertSendAndReceiveAsType(String routingKey, Object ob
* @param object the object to convert.
* @param responseType the response type.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object,
<C> CompletableFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object,
ParameterizedTypeReference<C> responseType);

/**
Expand All @@ -167,9 +168,9 @@ <C> ListenableFuture<C> convertSendAndReceiveAsType(String exchange, String rout
* @param messagePostProcessor the post processor.
* @param responseType the response type.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceiveAsType(Object object, MessagePostProcessor messagePostProcessor,
<C> CompletableFuture<C> convertSendAndReceiveAsType(Object object, MessagePostProcessor messagePostProcessor,
ParameterizedTypeReference<C> responseType);

/**
Expand All @@ -181,9 +182,9 @@ <C> ListenableFuture<C> convertSendAndReceiveAsType(Object object, MessagePostPr
* @param messagePostProcessor the post processor.
* @param responseType the response type.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceiveAsType(String routingKey, Object object,
<C> CompletableFuture<C> convertSendAndReceiveAsType(String routingKey, Object object,
MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType);

/**
Expand All @@ -196,9 +197,9 @@ <C> ListenableFuture<C> convertSendAndReceiveAsType(String routingKey, Object ob
* @param messagePostProcessor the post processor.
* @param responseType the response type.
* @param <C> the expected result type.
* @return the {@link ListenableFuture}.
* @return the {@link CompletableFuture}.
*/
<C> ListenableFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object,
<C> CompletableFuture<C> convertSendAndReceiveAsType(String exchange, String routingKey, Object object,
MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<C> responseType);

}
Loading

0 comments on commit 1c2b5b9

Please sign in to comment.