diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index f727f21689bec..3fbab236a60ee 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -19,12 +19,10 @@ package org.apache.pulsar.client.api; import java.io.Closeable; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -35,7 +33,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public interface Consumer extends Closeable { +public interface Consumer extends Closeable, MessageAcknowledger { /** * Get a topic for the consumer. @@ -156,42 +154,6 @@ public interface Consumer extends Closeable { */ CompletableFuture> batchReceiveAsync(); - /** - * Acknowledge the consumption of a single message. - * - * @param message - * The {@code Message} to be acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledge(Message message) throws PulsarClientException; - - /** - * Acknowledge the consumption of a single message, identified by its {@link MessageId}. - * - * @param messageId - * The {@link MessageId} to be acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledge(MessageId messageId) throws PulsarClientException; - - /** - * Acknowledge the consumption of {@link Messages}. - * - * @param messages messages - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledge(Messages messages) throws PulsarClientException; - - /** - * Acknowledge the consumption of a list of message. - * @param messageIdList - * @throws PulsarClientException - */ - void acknowledge(List messageIdList) throws PulsarClientException; - /** * Acknowledge the failure to process a single message. * @@ -356,78 +318,6 @@ void reconsumeLater(Message message, */ void reconsumeLater(Messages messages, long delayTime, TimeUnit unit) throws PulsarClientException; - /** - * Acknowledge the reception of all the messages in the stream up to (and including) the provided message. - * - *

This method will block until the acknowledge has been sent to the broker. After that, the messages will not be - * re-delivered to this consumer. - * - *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - *

It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered. - * - * @param message - * The {@code Message} to be cumulatively acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledgeCumulative(Message message) throws PulsarClientException; - - /** - * Acknowledge the reception of all the messages in the stream up to (and including) the provided message. - * - *

This method will block until the acknowledge has been sent to the broker. After that, the messages will not be - * re-delivered to this consumer. - * - *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - *

It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered. - * - * @param messageId - * The {@code MessageId} to be cumulatively acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledgeCumulative(MessageId messageId) throws PulsarClientException; - - /** - * Acknowledge the reception of all the messages in the stream up to (and including) the provided message with this - * transaction, it will store in transaction pending ack. - * - *

After the transaction commit, the end of previous transaction acked message until this transaction - * acked message will actually ack. - * - *

After the transaction abort, the end of previous transaction acked message until this transaction - * acked message will be redelivered to this consumer. - * - *

Cumulative acknowledge with transaction only support cumulative ack and now have not support individual and - * cumulative ack sharing. - * - *

If cumulative ack with a transaction success, we can cumulative ack messageId with the same transaction - * more than previous messageId. - * - *

It will not be allowed to cumulative ack with a transaction different from the previous one when the previous - * transaction haven't commit or abort. - * - *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - * @param messageId - * The {@code MessageId} to be cumulatively acknowledged - * @param txn {@link Transaction} the transaction to cumulative ack - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException - * if the ack with messageId is less than the messageId in pending ack state or ack with transaction is - * different from the transaction in pending ack. - * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException - * broker don't support transaction - * @return {@link CompletableFuture} the future of the ack result - * - * @since 2.7.0 - */ - CompletableFuture acknowledgeCumulativeAsync(MessageId messageId, - Transaction txn); - /** * reconsumeLater the reception of all the messages in the stream up to (and including) the provided message. * @@ -442,98 +332,6 @@ CompletableFuture acknowledgeCumulativeAsync(MessageId messageId, */ void reconsumeLaterCumulative(Message message, long delayTime, TimeUnit unit) throws PulsarClientException; - /** - * Asynchronously acknowledge the consumption of a single message. - * - * @param message - * The {@code Message} to be acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeAsync(Message message); - - /** - * Asynchronously acknowledge the consumption of a single message. - * - * @param messageId - * The {@code MessageId} to be acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeAsync(MessageId messageId); - - /** - * Asynchronously acknowledge the consumption of a single message, it will store in pending ack. - * - *

After the transaction commit, the message will actually ack. - * - *

After the transaction abort, the message will be redelivered. - * - * @param messageId {@link MessageId} to be individual acknowledged - * @param txn {@link Transaction} the transaction to cumulative ack - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException - * if the ack with messageId has been acked by another transaction - * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException - * broker don't support transaction - * don't find batch size in consumer pending ack - * @return {@link CompletableFuture} the future of the ack result - * - * @since 2.7.0 - */ - CompletableFuture acknowledgeAsync(MessageId messageId, Transaction txn); - - /** - * Asynchronously acknowledge the consumption of {@link Messages}. - * - * @param messages - * The {@link Messages} to be acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeAsync(Messages messages); - - - /** - * Asynchronously acknowledge the consumption of {@link Messages}, it will store in pending ack. - * After the transaction commit, the message will actually ack. - * After the transaction abort, the message will be redelivered. - * @param messages - * The {@link Messages} to be acknowledged - * @param txn {@link Transaction} The transaction to ack messages. - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException - * if the ack with messageId is less than the messageId in pending ack state or ack with transaction is - * different from the transaction in pending ack. - * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException - * broker don't support transaction - * @return {@link CompletableFuture} the future of the ack result - * */ - CompletableFuture acknowledgeAsync(Messages messages, Transaction txn); - - - /** - * Asynchronously acknowledge the consumption of a list of message. - * @param messageIdList - * @return - */ - CompletableFuture acknowledgeAsync(List messageIdList); - - - /** - * Acknowledge the consumption of a list of message, it will store in pending ack. - * After the transaction commit, the message will actually ack. - * After the transaction abort, the message will be redelivered. - * @param messageIdList A list of message Id. - * @param txn {@link Transaction} The transaction to ack messages. - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException - * if the ack with messageId is less than the messageId in pending ack state or ack with transaction is - * different from the transaction in pending ack. - * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException - * broker don't support transaction - * @return {@link CompletableFuture} the future of the ack result */ - CompletableFuture acknowledgeAsync(List messageIdList, Transaction txn); /** * Asynchronously reconsumeLater the consumption of a single message. @@ -578,30 +376,6 @@ CompletableFuture reconsumeLaterAsync(Message message, */ CompletableFuture reconsumeLaterAsync(Messages messages, long delayTime, TimeUnit unit); - /** - * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided - * message. - * - *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - * @param message - * The {@code Message} to be cumulatively acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeCumulativeAsync(Message message); - - /** - * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided - * message. - * - *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - * @param messageId - * The {@code MessageId} to be cumulatively acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeCumulativeAsync(MessageId messageId); - /** * Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided * message. diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java new file mode 100644 index 0000000000000..c0a53983c5adb --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.transaction.Transaction; + +/** + * The interface to acknowledge one or more messages individually or cumulatively. + *

+ * It contains two methods of various overloads: + * - `acknowledge`: acknowledge individually + * - `acknowledgeCumulative`: acknowledge cumulatively + * Each of them has an associated asynchronous API that has the "Async" suffix in the name. + *

+ * The 1st method parameter is + * - {@link MessageId} or {@link Message} when acknowledging a single message + * - {@link java.util.List} or {@link Messages} when acknowledging multiple messages + *

+ * The 2nd method parameter is optional. Specify a non-null {@link Transaction} instance for transaction usages: + * - After the transaction is committed, the message will be actually acknowledged (individually or cumulatively). + * - After the transaction is aborted, the message will be redelivered. + * @see Transaction#commit() + * @see Transaction#abort() + */ +public interface MessageAcknowledger { + + /** + * Acknowledge the consumption of a single message. + * + * @param messageId {@link MessageId} to be individual acknowledged + * + * @throws PulsarClientException.AlreadyClosedException} + * if the consumer was already closed + */ + void acknowledge(MessageId messageId) throws PulsarClientException; + + default void acknowledge(Message message) throws PulsarClientException { + acknowledge(message.getMessageId()); + } + + /** + * Acknowledge the consumption of a list of message. + * @param messageIdList the list of message IDs. + */ + void acknowledge(List messageIdList) throws PulsarClientException; + + default void acknowledge(Messages messages) throws PulsarClientException { + for (Message message : messages) { + acknowledge(message.getMessageId()); + } + } + + /** + * Acknowledge the reception of all the messages in the stream up to (and including) the provided message. + * + *

This method will block until the acknowledge has been sent to the broker. After that, the messages will not be + * re-delivered to this consumer. + * + *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. + * + *

It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered. + * + * @param messageId + * The {@code MessageId} to be cumulatively acknowledged + * @throws PulsarClientException.AlreadyClosedException + * if the consumer was already closed + */ + void acknowledgeCumulative(MessageId messageId) throws PulsarClientException; + + default void acknowledgeCumulative(Message message) throws PulsarClientException { + acknowledgeCumulative(message.getMessageId()); + } + + /** + * The asynchronous version of {@link #acknowledge(MessageId)} with transaction support. + */ + CompletableFuture acknowledgeAsync(MessageId messageId, Transaction txn); + + /** + * The asynchronous version of {@link #acknowledge(MessageId)}. + */ + default CompletableFuture acknowledgeAsync(MessageId messageId) { + return acknowledgeAsync(messageId, null); + } + + /** + * The asynchronous version of {@link #acknowledge(List)} with transaction support. + */ + CompletableFuture acknowledgeAsync(List messageIdList, Transaction txn); + + /** + * The asynchronous version of {@link #acknowledge(List)}. + */ + CompletableFuture acknowledgeAsync(List messageIdList); + + /** + * The asynchronous version of {@link #acknowledge(Message)}. + */ + CompletableFuture acknowledgeAsync(Message message); + + /** + * The asynchronous version of {@link #acknowledge(Messages)}. + */ + CompletableFuture acknowledgeAsync(Messages messages); + + /** + * The asynchronous version of {@link #acknowledge(Messages)} with transaction support. + */ + CompletableFuture acknowledgeAsync(Messages messages, Transaction txn); + + /** + * The asynchronous version of {@link #acknowledgeCumulative(MessageId)} with transaction support. + * + * @apiNote It's not allowed to cumulative ack with a transaction different from the previous one when the previous + * transaction is not committed or aborted. + * @apiNote It cannot be used for {@link SubscriptionType#Shared} subscription. + * + * @param messageId + * The {@code MessageId} to be cumulatively acknowledged + * @param txn {@link Transaction} the transaction to cumulative ack + */ + CompletableFuture acknowledgeCumulativeAsync(MessageId messageId, + Transaction txn); + + /** + * The asynchronous version of {@link #acknowledgeCumulative(Message)}. + */ + default CompletableFuture acknowledgeCumulativeAsync(Message message) { + return acknowledgeCumulativeAsync(message.getMessageId()); + } + + /** + * The asynchronous version of {@link #acknowledgeCumulative(MessageId)}. + */ + default CompletableFuture acknowledgeCumulativeAsync(MessageId messageId) { + return acknowledgeCumulativeAsync(messageId, null); + } +}