From bd2b96f2a7cc5467000f14bf039bb2e8fc24f310 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 17 Nov 2022 17:41:15 +0800 Subject: [PATCH] [improve][client] Move acknowledge APIs to another interface and improve docs ### Motivation Currently there are many acknowledge methods in `Consumer` interface, including: - 7 overloads of `acknowledgeAsync` - 4 overloads of `acknowledge` - 3 overloads of `acknowledgeCumulativeAsync` - 2 overloads of `acknowledgeCumulative` The JavaDocs of these APIs are also massive and duplicated. For example, some terms are not for application users like "pending ack", which should not appear in API Docs. ### Modifications - Add a new interface `MessageAcknowledger` and move the acknowledge APIs into this new interface. - Improve the API docs. For some overload methods, change them to default methods so that duplicated description can be avoided. The new API docs should be much more clear than before. Though this PR adds a new API, the API compatibility is guaranteed. --- .../apache/pulsar/client/api/Consumer.java | 228 +----------------- .../client/api/MessageAcknowledger.java | 156 ++++++++++++ 2 files changed, 157 insertions(+), 227 deletions(-) create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index a25beef801110..2d1b2982d1b49 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -19,12 +19,10 @@ package org.apache.pulsar.client.api; import java.io.Closeable; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -35,7 +33,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public interface Consumer extends Closeable { +public interface Consumer extends Closeable, MessageAcknowledger { /** * Get a topic for the consumer. @@ -156,42 +154,6 @@ public interface Consumer extends Closeable { */ CompletableFuture> batchReceiveAsync(); - /** - * Acknowledge the consumption of a single message. - * - * @param message - * The {@code Message} to be acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledge(Message message) throws PulsarClientException; - - /** - * Acknowledge the consumption of a single message, identified by its {@link MessageId}. - * - * @param messageId - * The {@link MessageId} to be acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledge(MessageId messageId) throws PulsarClientException; - - /** - * Acknowledge the consumption of {@link Messages}. - * - * @param messages messages - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledge(Messages messages) throws PulsarClientException; - - /** - * Acknowledge the consumption of a list of message. - * @param messageIdList - * @throws PulsarClientException - */ - void acknowledge(List messageIdList) throws PulsarClientException; - /** * Acknowledge the failure to process a single message. * @@ -356,78 +318,6 @@ void reconsumeLater(Message message, */ void reconsumeLater(Messages messages, long delayTime, TimeUnit unit) throws PulsarClientException; - /** - * Acknowledge the reception of all the messages in the stream up to (and including) the provided message. - * - *

This method will block until the acknowledge has been sent to the broker. After that, the messages will not be - * re-delivered to this consumer. - * - *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - *

It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered. - * - * @param message - * The {@code Message} to be cumulatively acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledgeCumulative(Message message) throws PulsarClientException; - - /** - * Acknowledge the reception of all the messages in the stream up to (and including) the provided message. - * - *

This method will block until the acknowledge has been sent to the broker. After that, the messages will not be - * re-delivered to this consumer. - * - *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - *

It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered. - * - * @param messageId - * The {@code MessageId} to be cumulatively acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledgeCumulative(MessageId messageId) throws PulsarClientException; - - /** - * Acknowledge the reception of all the messages in the stream up to (and including) the provided message with this - * transaction, it will store in transaction pending ack. - * - *

After the transaction commit, the end of previous transaction acked message until this transaction - * acked message will actually ack. - * - *

After the transaction abort, the end of previous transaction acked message until this transaction - * acked message will be redelivered to this consumer. - * - *

Cumulative acknowledge with transaction only support cumulative ack and now have not support individual and - * cumulative ack sharing. - * - *

If cumulative ack with a transaction success, we can cumulative ack messageId with the same transaction - * more than previous messageId. - * - *

It will not be allowed to cumulative ack with a transaction different from the previous one when the previous - * transaction haven't commit or abort. - * - *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - * @param messageId - * The {@code MessageId} to be cumulatively acknowledged - * @param txn {@link Transaction} the transaction to cumulative ack - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException - * if the ack with messageId is less than the messageId in pending ack state or ack with transaction is - * different from the transaction in pending ack. - * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException - * broker don't support transaction - * @return {@link CompletableFuture} the future of the ack result - * - * @since 2.7.0 - */ - CompletableFuture acknowledgeCumulativeAsync(MessageId messageId, - Transaction txn); - /** * reconsumeLater the reception of all the messages in the stream up to (and including) the provided message. * @@ -442,98 +332,6 @@ CompletableFuture acknowledgeCumulativeAsync(MessageId messageId, */ void reconsumeLaterCumulative(Message message, long delayTime, TimeUnit unit) throws PulsarClientException; - /** - * Asynchronously acknowledge the consumption of a single message. - * - * @param message - * The {@code Message} to be acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeAsync(Message message); - - /** - * Asynchronously acknowledge the consumption of a single message. - * - * @param messageId - * The {@code MessageId} to be acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeAsync(MessageId messageId); - - /** - * Asynchronously acknowledge the consumption of a single message, it will store in pending ack. - * - *

After the transaction commit, the message will actually ack. - * - *

After the transaction abort, the message will be redelivered. - * - * @param messageId {@link MessageId} to be individual acknowledged - * @param txn {@link Transaction} the transaction to cumulative ack - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException - * if the ack with messageId has been acked by another transaction - * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException - * broker don't support transaction - * don't find batch size in consumer pending ack - * @return {@link CompletableFuture} the future of the ack result - * - * @since 2.7.0 - */ - CompletableFuture acknowledgeAsync(MessageId messageId, Transaction txn); - - /** - * Asynchronously acknowledge the consumption of {@link Messages}. - * - * @param messages - * The {@link Messages} to be acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeAsync(Messages messages); - - - /** - * Asynchronously acknowledge the consumption of {@link Messages}, it will store in pending ack. - * After the transaction commit, the message will actually ack. - * After the transaction abort, the message will be redelivered. - * @param messages - * The {@link Messages} to be acknowledged - * @param txn {@link Transaction} The transaction to ack messages. - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException - * if the ack with messageId is less than the messageId in pending ack state or ack with transaction is - * different from the transaction in pending ack. - * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException - * broker don't support transaction - * @return {@link CompletableFuture} the future of the ack result - * */ - CompletableFuture acknowledgeAsync(Messages messages, Transaction txn); - - - /** - * Asynchronously acknowledge the consumption of a list of message. - * @param messageIdList - * @return - */ - CompletableFuture acknowledgeAsync(List messageIdList); - - - /** - * Acknowledge the consumption of a list of message, it will store in pending ack. - * After the transaction commit, the message will actually ack. - * After the transaction abort, the message will be redelivered. - * @param messageIdList A list of message Id. - * @param txn {@link Transaction} The transaction to ack messages. - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException - * if the ack with messageId is less than the messageId in pending ack state or ack with transaction is - * different from the transaction in pending ack. - * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException - * broker don't support transaction - * @return {@link CompletableFuture} the future of the ack result */ - CompletableFuture acknowledgeAsync(List messageIdList, Transaction txn); /** * Asynchronously reconsumeLater the consumption of a single message. @@ -578,30 +376,6 @@ CompletableFuture reconsumeLaterAsync(Message message, */ CompletableFuture reconsumeLaterAsync(Messages messages, long delayTime, TimeUnit unit); - /** - * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided - * message. - * - *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - * @param message - * The {@code Message} to be cumulatively acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeCumulativeAsync(Message message); - - /** - * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided - * message. - * - *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - * @param messageId - * The {@code MessageId} to be cumulatively acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeCumulativeAsync(MessageId messageId); - /** * Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided * message. diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java new file mode 100644 index 0000000000000..c0a53983c5adb --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.transaction.Transaction; + +/** + * The interface to acknowledge one or more messages individually or cumulatively. + *

+ * It contains two methods of various overloads: + * - `acknowledge`: acknowledge individually + * - `acknowledgeCumulative`: acknowledge cumulatively + * Each of them has an associated asynchronous API that has the "Async" suffix in the name. + *

+ * The 1st method parameter is + * - {@link MessageId} or {@link Message} when acknowledging a single message + * - {@link java.util.List} or {@link Messages} when acknowledging multiple messages + *

+ * The 2nd method parameter is optional. Specify a non-null {@link Transaction} instance for transaction usages: + * - After the transaction is committed, the message will be actually acknowledged (individually or cumulatively). + * - After the transaction is aborted, the message will be redelivered. + * @see Transaction#commit() + * @see Transaction#abort() + */ +public interface MessageAcknowledger { + + /** + * Acknowledge the consumption of a single message. + * + * @param messageId {@link MessageId} to be individual acknowledged + * + * @throws PulsarClientException.AlreadyClosedException} + * if the consumer was already closed + */ + void acknowledge(MessageId messageId) throws PulsarClientException; + + default void acknowledge(Message message) throws PulsarClientException { + acknowledge(message.getMessageId()); + } + + /** + * Acknowledge the consumption of a list of message. + * @param messageIdList the list of message IDs. + */ + void acknowledge(List messageIdList) throws PulsarClientException; + + default void acknowledge(Messages messages) throws PulsarClientException { + for (Message message : messages) { + acknowledge(message.getMessageId()); + } + } + + /** + * Acknowledge the reception of all the messages in the stream up to (and including) the provided message. + * + *

This method will block until the acknowledge has been sent to the broker. After that, the messages will not be + * re-delivered to this consumer. + * + *

Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. + * + *

It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered. + * + * @param messageId + * The {@code MessageId} to be cumulatively acknowledged + * @throws PulsarClientException.AlreadyClosedException + * if the consumer was already closed + */ + void acknowledgeCumulative(MessageId messageId) throws PulsarClientException; + + default void acknowledgeCumulative(Message message) throws PulsarClientException { + acknowledgeCumulative(message.getMessageId()); + } + + /** + * The asynchronous version of {@link #acknowledge(MessageId)} with transaction support. + */ + CompletableFuture acknowledgeAsync(MessageId messageId, Transaction txn); + + /** + * The asynchronous version of {@link #acknowledge(MessageId)}. + */ + default CompletableFuture acknowledgeAsync(MessageId messageId) { + return acknowledgeAsync(messageId, null); + } + + /** + * The asynchronous version of {@link #acknowledge(List)} with transaction support. + */ + CompletableFuture acknowledgeAsync(List messageIdList, Transaction txn); + + /** + * The asynchronous version of {@link #acknowledge(List)}. + */ + CompletableFuture acknowledgeAsync(List messageIdList); + + /** + * The asynchronous version of {@link #acknowledge(Message)}. + */ + CompletableFuture acknowledgeAsync(Message message); + + /** + * The asynchronous version of {@link #acknowledge(Messages)}. + */ + CompletableFuture acknowledgeAsync(Messages messages); + + /** + * The asynchronous version of {@link #acknowledge(Messages)} with transaction support. + */ + CompletableFuture acknowledgeAsync(Messages messages, Transaction txn); + + /** + * The asynchronous version of {@link #acknowledgeCumulative(MessageId)} with transaction support. + * + * @apiNote It's not allowed to cumulative ack with a transaction different from the previous one when the previous + * transaction is not committed or aborted. + * @apiNote It cannot be used for {@link SubscriptionType#Shared} subscription. + * + * @param messageId + * The {@code MessageId} to be cumulatively acknowledged + * @param txn {@link Transaction} the transaction to cumulative ack + */ + CompletableFuture acknowledgeCumulativeAsync(MessageId messageId, + Transaction txn); + + /** + * The asynchronous version of {@link #acknowledgeCumulative(Message)}. + */ + default CompletableFuture acknowledgeCumulativeAsync(Message message) { + return acknowledgeCumulativeAsync(message.getMessageId()); + } + + /** + * The asynchronous version of {@link #acknowledgeCumulative(MessageId)}. + */ + default CompletableFuture acknowledgeCumulativeAsync(MessageId messageId) { + return acknowledgeCumulativeAsync(messageId, null); + } +}