Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] Move acknowledge APIs to another interface and improve docs #18519

Merged
merged 3 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.pulsar.client.api;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Expand All @@ -35,7 +33,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Consumer<T> extends Closeable {
public interface Consumer<T> extends Closeable, MessageAcknowledger {

/**
* Get a topic for the consumer.
Expand Down Expand Up @@ -156,42 +154,6 @@ public interface Consumer<T> extends Closeable {
*/
CompletableFuture<Messages<T>> batchReceiveAsync();

/**
* Acknowledge the consumption of a single message.
*
* @param message
* The {@code Message} to be acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
void acknowledge(Message<?> message) throws PulsarClientException;

/**
* Acknowledge the consumption of a single message, identified by its {@link MessageId}.
*
* @param messageId
* The {@link MessageId} to be acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
void acknowledge(MessageId messageId) throws PulsarClientException;

/**
* Acknowledge the consumption of {@link Messages}.
*
* @param messages messages
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
void acknowledge(Messages<?> messages) throws PulsarClientException;

/**
* Acknowledge the consumption of a list of message.
* @param messageIdList
* @throws PulsarClientException
*/
void acknowledge(List<MessageId> messageIdList) throws PulsarClientException;

/**
* Acknowledge the failure to process a single message.
*
Expand Down Expand Up @@ -356,78 +318,6 @@ void reconsumeLater(Message<?> message,
*/
void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException;

/**
* Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
*
* <p>This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
* re-delivered to this consumer.
*
* <p>Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
*
* <p>It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
*
* @param message
* The {@code Message} to be cumulatively acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
void acknowledgeCumulative(Message<?> message) throws PulsarClientException;

/**
* Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
*
* <p>This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
* re-delivered to this consumer.
*
* <p>Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
*
* <p>It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered.
*
* @param messageId
* The {@code MessageId} to be cumulatively acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;

/**
* Acknowledge the reception of all the messages in the stream up to (and including) the provided message with this
* transaction, it will store in transaction pending ack.
*
* <p>After the transaction commit, the end of previous transaction acked message until this transaction
* acked message will actually ack.
*
* <p>After the transaction abort, the end of previous transaction acked message until this transaction
* acked message will be redelivered to this consumer.
*
* <p>Cumulative acknowledge with transaction only support cumulative ack and now have not support individual and
* cumulative ack sharing.
*
* <p>If cumulative ack with a transaction success, we can cumulative ack messageId with the same transaction
* more than previous messageId.
*
* <p>It will not be allowed to cumulative ack with a transaction different from the previous one when the previous
* transaction haven't commit or abort.
*
* <p>Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
*
* @param messageId
* The {@code MessageId} to be cumulatively acknowledged
* @param txn {@link Transaction} the transaction to cumulative ack
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
* if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
* different from the transaction in pending ack.
* @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
* broker don't support transaction
* @return {@link CompletableFuture} the future of the ack result
*
* @since 2.7.0
*/
CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
Transaction txn);

/**
* reconsumeLater the reception of all the messages in the stream up to (and including) the provided message.
*
Expand All @@ -442,98 +332,6 @@ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
*/
void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException;

/**
* Asynchronously acknowledge the consumption of a single message.
*
* @param message
* The {@code Message} to be acknowledged
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> acknowledgeAsync(Message<?> message);

/**
* Asynchronously acknowledge the consumption of a single message.
*
* @param messageId
* The {@code MessageId} to be acknowledged
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> acknowledgeAsync(MessageId messageId);

/**
* Asynchronously acknowledge the consumption of a single message, it will store in pending ack.
*
* <p>After the transaction commit, the message will actually ack.
*
* <p>After the transaction abort, the message will be redelivered.
*
* @param messageId {@link MessageId} to be individual acknowledged
* @param txn {@link Transaction} the transaction to cumulative ack
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
* if the ack with messageId has been acked by another transaction
* @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
* broker don't support transaction
* don't find batch size in consumer pending ack
* @return {@link CompletableFuture} the future of the ack result
*
* @since 2.7.0
*/
CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction txn);

/**
* Asynchronously acknowledge the consumption of {@link Messages}.
*
* @param messages
* The {@link Messages} to be acknowledged
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);


/**
* Asynchronously acknowledge the consumption of {@link Messages}, it will store in pending ack.
* After the transaction commit, the message will actually ack.
* After the transaction abort, the message will be redelivered.
* @param messages
* The {@link Messages} to be acknowledged
* @param txn {@link Transaction} The transaction to ack messages.
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
* if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
* different from the transaction in pending ack.
* @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
* broker don't support transaction
* @return {@link CompletableFuture} the future of the ack result
* */
CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction txn);


/**
* Asynchronously acknowledge the consumption of a list of message.
* @param messageIdList
* @return
*/
CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);


/**
* Acknowledge the consumption of a list of message, it will store in pending ack.
* After the transaction commit, the message will actually ack.
* After the transaction abort, the message will be redelivered.
* @param messageIdList A list of message Id.
* @param txn {@link Transaction} The transaction to ack messages.
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
* if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
* different from the transaction in pending ack.
* @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
* broker don't support transaction
* @return {@link CompletableFuture} the future of the ack result */
CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, Transaction txn);

/**
* Asynchronously reconsumeLater the consumption of a single message.
Expand Down Expand Up @@ -578,30 +376,6 @@ CompletableFuture<Void> reconsumeLaterAsync(Message<?> message,
*/
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit);

/**
* Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
* message.
*
* <p>Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
*
* @param message
* The {@code Message} to be cumulatively acknowledged
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);

/**
* Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
* message.
*
* <p>Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
*
* @param messageId
* The {@code MessageId} to be cumulatively acknowledged
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);

/**
* Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided
* message.
Expand Down
Loading