Skip to content

Commit

Permalink
[improve][client] Move acknowledge APIs to another interface and impr…
Browse files Browse the repository at this point in the history
…ove docs (apache#18519)
  • Loading branch information
BewareMyPower authored and lifepuzzlefun committed Jan 10, 2023
1 parent bbf8e26 commit af467fc
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.pulsar.client.api;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Expand All @@ -35,7 +33,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Consumer<T> extends Closeable {
public interface Consumer<T> extends Closeable, MessageAcknowledger {

/**
* Get a topic for the consumer.
Expand Down Expand Up @@ -156,42 +154,6 @@ public interface Consumer<T> extends Closeable {
*/
CompletableFuture<Messages<T>> batchReceiveAsync();

/**
* Acknowledge the consumption of a single message.
*
* @param message
* The {@code Message} to be acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
void acknowledge(Message<?> message) throws PulsarClientException;

/**
* Acknowledge the consumption of a single message, identified by its {@link MessageId}.
*
* @param messageId
* The {@link MessageId} to be acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
void acknowledge(MessageId messageId) throws PulsarClientException;

/**
* Acknowledge the consumption of {@link Messages}.
*
* @param messages messages
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
void acknowledge(Messages<?> messages) throws PulsarClientException;

/**
* Acknowledge the consumption of a list of message.
* @param messageIdList
* @throws PulsarClientException
*/
void acknowledge(List<MessageId> messageIdList) throws PulsarClientException;

/**
* Acknowledge the failure to process a single message.
*
Expand Down Expand Up @@ -356,78 +318,6 @@ void reconsumeLater(Message<?> message,
*/
void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException;

/**
* Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
*
* <p>This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
* re-delivered to this consumer.
*
* <p>Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
*
* <p>It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
*
* @param message
* The {@code Message} to be cumulatively acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
void acknowledgeCumulative(Message<?> message) throws PulsarClientException;

/**
* Acknowledge the reception of all the messages in the stream up to (and including) the provided message.
*
* <p>This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
* re-delivered to this consumer.
*
* <p>Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
*
* <p>It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered.
*
* @param messageId
* The {@code MessageId} to be cumulatively acknowledged
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
*/
void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;

/**
* Acknowledge the reception of all the messages in the stream up to (and including) the provided message with this
* transaction, it will store in transaction pending ack.
*
* <p>After the transaction commit, the end of previous transaction acked message until this transaction
* acked message will actually ack.
*
* <p>After the transaction abort, the end of previous transaction acked message until this transaction
* acked message will be redelivered to this consumer.
*
* <p>Cumulative acknowledge with transaction only support cumulative ack and now have not support individual and
* cumulative ack sharing.
*
* <p>If cumulative ack with a transaction success, we can cumulative ack messageId with the same transaction
* more than previous messageId.
*
* <p>It will not be allowed to cumulative ack with a transaction different from the previous one when the previous
* transaction haven't commit or abort.
*
* <p>Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
*
* @param messageId
* The {@code MessageId} to be cumulatively acknowledged
* @param txn {@link Transaction} the transaction to cumulative ack
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
* if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
* different from the transaction in pending ack.
* @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
* broker don't support transaction
* @return {@link CompletableFuture} the future of the ack result
*
* @since 2.7.0
*/
CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
Transaction txn);

/**
* reconsumeLater the reception of all the messages in the stream up to (and including) the provided message.
*
Expand All @@ -442,98 +332,6 @@ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
*/
void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException;

/**
* Asynchronously acknowledge the consumption of a single message.
*
* @param message
* The {@code Message} to be acknowledged
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> acknowledgeAsync(Message<?> message);

/**
* Asynchronously acknowledge the consumption of a single message.
*
* @param messageId
* The {@code MessageId} to be acknowledged
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> acknowledgeAsync(MessageId messageId);

/**
* Asynchronously acknowledge the consumption of a single message, it will store in pending ack.
*
* <p>After the transaction commit, the message will actually ack.
*
* <p>After the transaction abort, the message will be redelivered.
*
* @param messageId {@link MessageId} to be individual acknowledged
* @param txn {@link Transaction} the transaction to cumulative ack
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
* if the ack with messageId has been acked by another transaction
* @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
* broker don't support transaction
* don't find batch size in consumer pending ack
* @return {@link CompletableFuture} the future of the ack result
*
* @since 2.7.0
*/
CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction txn);

/**
* Asynchronously acknowledge the consumption of {@link Messages}.
*
* @param messages
* The {@link Messages} to be acknowledged
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);


/**
* Asynchronously acknowledge the consumption of {@link Messages}, it will store in pending ack.
* After the transaction commit, the message will actually ack.
* After the transaction abort, the message will be redelivered.
* @param messages
* The {@link Messages} to be acknowledged
* @param txn {@link Transaction} The transaction to ack messages.
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
* if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
* different from the transaction in pending ack.
* @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
* broker don't support transaction
* @return {@link CompletableFuture} the future of the ack result
* */
CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction txn);


/**
* Asynchronously acknowledge the consumption of a list of message.
* @param messageIdList
* @return
*/
CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);


/**
* Acknowledge the consumption of a list of message, it will store in pending ack.
* After the transaction commit, the message will actually ack.
* After the transaction abort, the message will be redelivered.
* @param messageIdList A list of message Id.
* @param txn {@link Transaction} The transaction to ack messages.
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
* if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
* different from the transaction in pending ack.
* @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
* broker don't support transaction
* @return {@link CompletableFuture} the future of the ack result */
CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, Transaction txn);

/**
* Asynchronously reconsumeLater the consumption of a single message.
Expand Down Expand Up @@ -578,30 +376,6 @@ CompletableFuture<Void> reconsumeLaterAsync(Message<?> message,
*/
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit);

/**
* Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
* message.
*
* <p>Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
*
* @param message
* The {@code Message} to be cumulatively acknowledged
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);

/**
* Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided
* message.
*
* <p>Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
*
* @param messageId
* The {@code MessageId} to be cumulatively acknowledged
* @return a future that can be used to track the completion of the operation
*/
CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);

/**
* Asynchronously ReconsumeLater the reception of all the messages in the stream up to (and including) the provided
* message.
Expand Down
Loading

0 comments on commit af467fc

Please sign in to comment.