From 12e78b26c5aab55439363968084d93f1ddacc753 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 11 Oct 2022 10:14:25 +0800 Subject: [PATCH] [fix][client] Messages with inconsistent consumer epochs are not filtered when using batch receive and trigger timeout. (#17318) --- .../client/api/MultiTopicsConsumerTest.java | 39 ++++ .../client/impl/MessageRedeliveryTest.java | 196 ++++++++++++++++-- .../pulsar/client/impl/ConsumerBase.java | 9 +- .../pulsar/client/impl/ConsumerImpl.java | 150 ++++++-------- .../client/impl/MultiTopicsConsumerImpl.java | 54 +---- .../impl/UnAckedMessageRedeliveryTracker.java | 43 ++-- 6 files changed, 312 insertions(+), 179 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index 75b315413f59e..d0e8af54b621e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -42,6 +42,7 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -193,4 +194,42 @@ public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer() } Assert.assertEquals(numPartitions * numMessages, receivedCount); } + + @Test + public void testBatchReceiveAckTimeout() + throws PulsarAdminException, PulsarClientException { + String topicName = newTopicName(); + int numPartitions = 2; + int numMessages = 100000; + admin.topics().createPartitionedTopic(topicName, numPartitions); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT64) + .topic(topicName) + .enableBatching(false) + .blockIfQueueFull(true) + .create(); + + @Cleanup + Consumer consumer = pulsarClient + .newConsumer(Schema.INT64) + .topic(topicName) + .receiverQueueSize(numMessages) + .batchReceivePolicy( + BatchReceivePolicy.builder().maxNumMessages(1).timeout(2, TimeUnit.SECONDS).build() + ).ackTimeout(1000, TimeUnit.MILLISECONDS) + .subscriptionName(methodName) + .subscribe(); + + producer.newMessage() + .value(1l) + .send(); + + // first batch receive + Assert.assertEquals(consumer.batchReceive().size(), 1); + // Not ack, trigger redelivery this message. + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(consumer.batchReceive().size(), 1); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index b7ff6b21ce144..4e4e3ac086a2c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -22,7 +22,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; -import java.lang.reflect.Field; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -33,17 +32,17 @@ import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; -import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.naming.TopicName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -226,6 +225,7 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException final String topic = "testDoNotRedeliveryMarkDeleteMessages"; final String subName = "my-sub"; + @Cleanup Consumer consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName(subName) @@ -233,6 +233,7 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException .ackTimeout(1, TimeUnit.SECONDS) .subscribe(); + @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) .enableBatching(false) @@ -261,12 +262,15 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{ final String topic = "testRedeliveryAddEpoch"; final String subName = "my-sub"; - ConsumerBase consumer = ((ConsumerBase) pulsarClient.newConsumer(Schema.STRING) + + @Cleanup + ConsumerImpl consumer = ((ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName(subName) .subscriptionType(SubscriptionType.Failover) .subscribe()); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) .enableBatching(enableBatch) @@ -275,14 +279,9 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{ String test1 = "Pulsar1"; String test2 = "Pulsar2"; String test3 = "Pulsar3"; - producer.send(test1); - - PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopics() - .get(TopicName.get("persistent://public/default/" + topic).toString()).get().get(); - PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = - (PersistentDispatcherSingleActiveConsumer) persistentTopic.getSubscription(subName).getDispatcher(); consumer.setConsumerEpoch(1); + producer.send(test1); Message message = consumer.receive(3, TimeUnit.SECONDS); assertNull(message); consumer.redeliverUnacknowledgedMessages(); @@ -309,39 +308,136 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{ message = consumer.receive(3, TimeUnit.SECONDS); assertNull(message); - Field field = consumer.getClass().getDeclaredField("connectionHandler"); - field.setAccessible(true); - ConnectionHandler connectionHandler = (ConnectionHandler) field.get(consumer); - - field = connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER"); - field.setAccessible(true); - + ConnectionHandler connectionHandler = consumer.getConnectionHandler(); connectionHandler.cnx().channel().close(); - ((ConsumerImpl) consumer).grabCnx(); + consumer.grabCnx(); + message = consumer.receive(3, TimeUnit.SECONDS); assertNotNull(message); assertEquals(message.getValue(), test3); } + @Test(dataProvider = "enableBatch") + public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws Exception { + final String topic = "testRedeliveryAddEpochAndPermits"; + final String subName = "my-sub"; + // set receive queue size is 4, and first send 4 messages, + // then call redeliver messages, assert receive msg num. + int receiveQueueSize = 4; + + @Cleanup + ConsumerImpl consumer = ((ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .receiverQueueSize(receiveQueueSize) + .autoScaledReceiverQueueSizeEnabled(false) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Failover) + .subscribe()); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(enableBatch) + .create(); + + consumer.setConsumerEpoch(1); + for (int i = 0; i < receiveQueueSize; i++) { + producer.send("pulsar" + i); + } + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + + consumer.redeliverUnacknowledgedMessages(); + for (int i = 0; i < receiveQueueSize; i++) { + Message msg = consumer.receive(); + assertEquals("pulsar" + i, msg.getValue()); + } + } + + @Test(dataProvider = "enableBatch") + public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws Exception{ + final String topic = "testBatchReceiveRedeliveryAddEpoch"; + final String subName = "my-sub"; + + @Cleanup + ConsumerImpl consumer = ((ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName) + .batchReceivePolicy(BatchReceivePolicy.builder().timeout(1000, TimeUnit.MILLISECONDS).build()) + .subscriptionType(SubscriptionType.Failover) + .subscribe()); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(enableBatch) + .create(); + + String test1 = "Pulsar1"; + String test2 = "Pulsar2"; + String test3 = "Pulsar3"; + + consumer.setConsumerEpoch(1); + producer.send(test1); + + Messages messages; + Message message; + + messages = consumer.batchReceive(); + assertEquals(messages.size(), 0); + consumer.redeliverUnacknowledgedMessages(); + messages = consumer.batchReceive(); + assertEquals(messages.size(), 1); + message = messages.iterator().next(); + consumer.acknowledgeCumulativeAsync(message).get(); + assertEquals(message.getValue(), test1); + + consumer.setConsumerEpoch(3); + producer.send(test2); + messages = consumer.batchReceive(); + assertEquals(messages.size(), 0); + consumer.redeliverUnacknowledgedMessages(); + messages = consumer.batchReceive(); + assertEquals(messages.size(), 1); + message = messages.iterator().next(); + assertEquals(message.getValue(), test2); + consumer.acknowledgeCumulativeAsync(message).get(); + + consumer.setConsumerEpoch(6); + producer.send(test3); + messages = consumer.batchReceive(); + assertEquals(messages.size(), 0); + + ConnectionHandler connectionHandler = consumer.getConnectionHandler(); + connectionHandler.cnx().channel().close(); + + consumer.grabCnx(); + messages = consumer.batchReceive(); + assertEquals(messages.size(), 1); + message = messages.iterator().next(); + assertEquals(message.getValue(), test3); + } + @DataProvider(name = "enableBatch") public static Object[][] enableBatch() { return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } - @Test(dataProvider = "enableBatch") public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exception{ final String topic = "testMultiConsumerRedeliveryAddEpoch"; final String subName = "my-sub"; admin.topics().createPartitionedTopic(topic, 5); final int messageNumber = 50; + + @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName(subName) .subscriptionType(SubscriptionType.Failover) .subscribe(); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) .enableBatching(enableBatch) @@ -382,4 +478,66 @@ public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exce message = consumer.receive(5, TimeUnit.SECONDS); assertNull(message); } + + @Test(dataProvider = "enableBatch", invocationCount = 10) + public void testMultiConsumerBatchRedeliveryAddEpoch(boolean enableBatch) throws Exception{ + + final String topic = "testMultiConsumerBatchRedeliveryAddEpoch"; + final String subName = "my-sub"; + admin.topics().createPartitionedTopic(topic, 5); + final int messageNumber = 50; + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .batchReceivePolicy(BatchReceivePolicy.builder().timeout(2, TimeUnit.SECONDS).build()) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Failover) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(enableBatch) + .create(); + + for (int i = 0; i < messageNumber; i++) { + producer.send("" + i); + } + + int receiveNum = 0; + while (receiveNum < messageNumber) { + receiveNum += consumer.batchReceive().size(); + } + + // redeliverUnacknowledgedMessages once + consumer.redeliverUnacknowledgedMessages(); + + receiveNum = 0; + while (receiveNum < messageNumber) { + Messages messages = consumer.batchReceive(); + receiveNum += messages.size(); + for (Message message : messages) { + assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 1); + } + } + + // can't receive message again + assertEquals(consumer.batchReceive().size(), 0); + + // redeliverUnacknowledgedMessages twice + consumer.redeliverUnacknowledgedMessages(); + + receiveNum = 0; + while (receiveNum < messageNumber) { + Messages messages = consumer.batchReceive(); + receiveNum += messages.size(); + for (Message message : messages) { + assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 2); + } + } + + // can't receive message again + assertEquals(consumer.batchReceive().size(), 0); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 4a60dad8bfc15..e66675ceb6c54 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -913,7 +913,7 @@ protected void notifyPendingBatchReceivedCallBack() { reentrantLock.lock(); try { - notifyPendingBatchReceivedCallBack(opBatchReceive); + notifyPendingBatchReceivedCallBack(opBatchReceive.future); } finally { reentrantLock.unlock(); } @@ -941,7 +941,7 @@ private OpBatchReceive nextBatchReceive() { return opBatchReceive; } - protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatchReceive) { + protected final void notifyPendingBatchReceivedCallBack(CompletableFuture> batchReceiveFuture) { MessagesImpl messages = getNewMessagesImpl(); Message msgPeeked = incomingMessages.peek(); while (msgPeeked != null && messages.canAdd(msgPeeked)) { @@ -953,8 +953,7 @@ protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatc } msgPeeked = incomingMessages.peek(); } - - completePendingBatchReceive(opBatchReceive.future, messages); + completePendingBatchReceive(batchReceiveFuture, messages); } protected void completePendingBatchReceive(CompletableFuture> future, Messages messages) { @@ -1182,7 +1181,7 @@ protected boolean isValidConsumerEpoch(MessageImpl message) { || getSubType() == CommandSubscribe.SubType.Exclusive) && message.getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH && message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) { - log.warn("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], " + log.info("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], " + "consumerEpoch : [{}]", topic, message.getMessageId(), message.getConsumerEpoch(), consumerEpoch); message.release(); message.recycle(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 38251a7ad8659..01c49b3396e67 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -64,6 +64,7 @@ import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.Getter; +import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; @@ -453,9 +454,6 @@ protected Message internalReceive() throws PulsarClientException { } message = incomingMessages.take(); messageProcessed(message); - if (!isValidConsumerEpoch(message)) { - return internalReceive(); - } return beforeConsume(message); } catch (InterruptedException e) { stats.incrementNumReceiveFailed(); @@ -463,10 +461,6 @@ protected Message internalReceive() throws PulsarClientException { } } - private boolean isValidConsumerEpoch(Message message) { - return isValidConsumerEpoch((MessageImpl) message); - } - @Override protected CompletableFuture> internalReceiveAsync() { CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler(); @@ -479,11 +473,6 @@ protected CompletableFuture> internalReceiveAsync() { cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { messageProcessed(message); - if (!isValidConsumerEpoch(message)) { - pendingReceives.add(result); - cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); - return; - } result.complete(beforeConsume(message)); } }); @@ -494,7 +483,6 @@ protected CompletableFuture> internalReceiveAsync() { @Override protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarClientException { Message message; - long callTime = System.nanoTime(); try { if (incomingMessages.isEmpty()) { expectMoreIncomingMessages(); @@ -504,15 +492,6 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC return null; } messageProcessed(message); - if (!isValidConsumerEpoch(message)) { - long executionTime = System.nanoTime() - callTime; - long timeoutInNanos = unit.toNanos(timeout); - if (executionTime >= timeoutInNanos) { - return null; - } else { - return internalReceive(timeoutInNanos - executionTime, TimeUnit.NANOSECONDS); - } - } return beforeConsume(message); } catch (InterruptedException e) { State state = getState(); @@ -546,22 +525,7 @@ protected CompletableFuture> internalBatchReceiveAsync() { CompletableFuture> result = cancellationHandler.createFuture(); internalPinnedExecutor.execute(() -> { if (hasEnoughMessagesForBatchReceive()) { - MessagesImpl messages = getNewMessagesImpl(); - Message msgPeeked = incomingMessages.peek(); - while (msgPeeked != null && messages.canAdd(msgPeeked)) { - Message msg = incomingMessages.poll(); - if (msg != null) { - messageProcessed(msg); - if (!isValidConsumerEpoch(msg)) { - msgPeeked = incomingMessages.peek(); - continue; - } - Message interceptMsg = beforeConsume(msg); - messages.add(interceptMsg); - } - msgPeeked = incomingMessages.peek(); - } - result.complete(messages); + notifyPendingBatchReceivedCallBack(result); } else { expectMoreIncomingMessages(); OpBatchReceive opBatchReceive = OpBatchReceive.of(result); @@ -1225,6 +1189,10 @@ private void executeNotifyCallback(final MessageImpl message) { // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. // if asyncReceive is waiting then notify callback without adding to incomingMessages queue internalPinnedExecutor.execute(() -> { + if (!isValidConsumerEpoch(message)) { + increaseAvailablePermits(cnx()); + return; + } if (hasNextPendingReceive()) { notifyPendingReceivedCallback(message, null); } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) { @@ -1890,64 +1858,66 @@ public int numMessagesInQueue() { return incomingMessages.size(); } - @Override - public void redeliverUnacknowledgedMessages() { - // First : synchronized in order to handle consumer reconnect produce race condition, when broker receive - // redeliverUnacknowledgedMessages and consumer have not be created and - // then receive reconnect epoch change the broker is smaller than the client epoch, this will cause client epoch - // smaller than broker epoch forever. client will not receive message anymore. - // Second : we should synchronized `ClientCnx cnx = cnx()` to - // prevent use old cnx to send redeliverUnacknowledgedMessages to a old broker - synchronized (ConsumerImpl.this) { - ClientCnx cnx = cnx(); - // V1 don't support redeliverUnacknowledgedMessages - if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) { - if ((getState() == State.Connecting)) { - log.warn("[{}] Client Connection needs to be established " - + "for redelivery of unacknowledged messages", this); - } else { - log.warn("[{}] Reconnecting the client to redeliver the messages.", this); - cnx.ctx().close(); - } + public CompletableFuture internalRedeliverUnacknowledgedMessages() { + return CompletableFuture.runAsync(() -> { + // First : synchronized in order to handle consumer reconnect produce race condition, when broker receive + // redeliverUnacknowledgedMessages and consumer have not be created and then receive reconnect epoch + // change the broker is smaller than the client epoch, this will cause client epoch smaller + // than broker epoch forever. client will not receive message anymore. + // Second : we should synchronized `ClientCnx cnx = cnx()` to prevent use old cnx to + // send redeliverUnacknowledgedMessages to a old broker + synchronized (ConsumerImpl.this) { + ClientCnx cnx = cnx(); + // V1 don't support redeliverUnacknowledgedMessages + if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) { + if ((getState() == State.Connecting)) { + log.warn("[{}] Client Connection needs to be established " + + "for redelivery of unacknowledged messages", this); + } else { + log.warn("[{}] Reconnecting the client to redeliver the messages.", this); + cnx.ctx().close(); + } - return; - } + return; + } - // clear local message - int currentSize = 0; - currentSize = incomingMessages.size(); - clearIncomingMessages(); - unAckedMessageTracker.clear(); - - // we should increase epoch every time, because MultiTopicsConsumerImpl also increase it, - // we need to keep both epochs the same - if (conf.getSubscriptionType() == SubscriptionType.Failover - || conf.getSubscriptionType() == SubscriptionType.Exclusive) { - CONSUMER_EPOCH.incrementAndGet(this); - } - // is channel is connected, we should send redeliver command to broker - if (cnx != null && isConnected(cnx)) { - cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages( - consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise()); - if (currentSize > 0) { - increaseAvailablePermits(cnx, currentSize); + // we should increase epoch every time, because MultiTopicsConsumerImpl also increase it, + // we need to keep both epochs the same + if (conf.getSubscriptionType() == SubscriptionType.Failover + || conf.getSubscriptionType() == SubscriptionType.Exclusive) { + CONSUMER_EPOCH.incrementAndGet(this); } - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic, - consumerName, currentSize); + // clear local message + int currentSize = incomingMessages.size(); + clearIncomingMessages(); + unAckedMessageTracker.clear(); + // is channel is connected, we should send redeliver command to broker + if (cnx != null && isConnected(cnx)) { + cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages( + consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise()); + if (currentSize > 0) { + increaseAvailablePermits(cnx, currentSize); + } + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic, + consumerName, currentSize); + } + } else { + log.warn("[{}] Send redeliver messages command but the client is reconnect or close, " + + "so don't need to send redeliver command to broker", this); } - } else { - log.warn("[{}] Send redeliver messages command but the client is reconnect or close, " - + "so don't need to send redeliver command to broker", this); } - } + }, internalPinnedExecutor); } - public int clearIncomingMessagesAndGetMessageNumber() { - int messagesNumber = incomingMessages.size(); - clearIncomingMessages(); - unAckedMessageTracker.clear(); - return messagesNumber; + @SneakyThrows + @Override + public void redeliverUnacknowledgedMessages() { + try { + internalRedeliverUnacknowledgedMessages().get(); + } catch (ExecutionException e) { + throw e.getCause(); + } } @Override @@ -2007,7 +1977,7 @@ protected void updateAutoScaleReceiverQueueHint() { @Override protected void completeOpBatchReceive(OpBatchReceive op) { - notifyPendingBatchReceivedCallBack(op); + notifyPendingBatchReceivedCallBack(op.future); } private CompletableFuture> getRedeliveryMessageIdData(List messageIds) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 6f9f51ff25343..b45d60b364ad0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -48,6 +48,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import lombok.SneakyThrows; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; @@ -334,14 +335,6 @@ private void resumeReceivingFromPausedConsumersIfNeeded() { } } - // If message consumer epoch is smaller than consumer epoch present that - // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid. - // so we should release this message and receive again - private boolean isValidConsumerEpoch(Message message) { - return isValidConsumerEpoch(((MessageImpl) (((TopicMessageImpl) message)) - .getMessage())); - } - @Override public int minReceiverQueueSize() { int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize); @@ -364,11 +357,6 @@ protected Message internalReceive() throws PulsarClientException { message = incomingMessages.take(); decreaseIncomingMessageSize(message); checkState(message instanceof TopicMessageImpl); - if (!isValidConsumerEpoch(message)) { - resumeReceivingFromPausedConsumersIfNeeded(); - message.release(); - return internalReceive(); - } unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); resumeReceivingFromPausedConsumersIfNeeded(); return message; @@ -380,8 +368,6 @@ protected Message internalReceive() throws PulsarClientException { @Override protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarClientException { Message message; - - long callTime = System.nanoTime(); try { if (incomingMessages.isEmpty()) { expectMoreIncomingMessages(); @@ -390,16 +376,6 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC if (message != null) { decreaseIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); - if (!isValidConsumerEpoch(message)) { - long executionTime = System.nanoTime() - callTime; - long timeoutInNanos = unit.toNanos(timeout); - if (executionTime >= timeoutInNanos) { - return null; - } else { - resumeReceivingFromPausedConsumersIfNeeded(); - return internalReceive(timeoutInNanos - executionTime, TimeUnit.NANOSECONDS); - } - } unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); } resumeReceivingFromPausedConsumersIfNeeded(); @@ -430,22 +406,7 @@ protected CompletableFuture> internalBatchReceiveAsync() { CompletableFuture> result = cancellationHandler.createFuture(); internalPinnedExecutor.execute(() -> { if (hasEnoughMessagesForBatchReceive()) { - MessagesImpl messages = getNewMessagesImpl(); - Message msgPeeked = incomingMessages.peek(); - while (msgPeeked != null && messages.canAdd(msgPeeked)) { - Message msg = incomingMessages.poll(); - if (msg != null) { - decreaseIncomingMessageSize(msg); - if (!isValidConsumerEpoch(msg)) { - msgPeeked = incomingMessages.peek(); - continue; - } - Message interceptMsg = beforeConsume(msg); - messages.add(interceptMsg); - } - msgPeeked = incomingMessages.peek(); - } - result.complete(messages); + notifyPendingBatchReceivedCallBack(result); } else { expectMoreIncomingMessages(); OpBatchReceive opBatchReceive = OpBatchReceive.of(result); @@ -695,17 +656,24 @@ private ConsumerConfigurationData getInternalConsumerConfig() { return internalConsumerConfig; } + @SneakyThrows @Override public void redeliverUnacknowledgedMessages() { + List> futures = new ArrayList<>(consumers.size()); internalPinnedExecutor.execute(() -> { CONSUMER_EPOCH.incrementAndGet(this); consumers.values().stream().forEach(consumer -> { - consumer.redeliverUnacknowledgedMessages(); + futures.add(consumer.internalRedeliverUnacknowledgedMessages()); consumer.unAckedChunkedMessageIdSequenceMap.clear(); }); clearIncomingMessages(); unAckedMessageTracker.clear(); }); + try { + FutureUtil.waitForAll(futures).get(); + } catch (ExecutionException e) { + throw e.getCause(); + } resumeReceivingFromPausedConsumersIfNeeded(); } @@ -740,7 +708,7 @@ protected void updateAutoScaleReceiverQueueHint() { @Override protected void completeOpBatchReceive(OpBatchReceive op) { - notifyPendingBatchReceivedCallBack(op); + notifyPendingBatchReceivedCallBack(op.future); resumeReceivingFromPausedConsumersIfNeeded(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java index 8c15d48ffa9d6..1ce0fec5836d1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java @@ -60,6 +60,7 @@ public UnAckedMessageRedeliveryTracker(PulsarClientImpl client, ConsumerBase @Override public void run(Timeout t) throws Exception { writeLock.lock(); + Set messageIds = null; try { HashSet headPartition = redeliveryTimePartitions.removeFirst(); if (!headPartition.isEmpty()) { @@ -71,9 +72,13 @@ public void run(Timeout t) throws Exception { } headPartition.clear(); redeliveryTimePartitions.addLast(headPartition); - triggerRedelivery(consumerBase); + messageIds = getRedeliveryMessages(consumerBase); } finally { writeLock.unlock(); + if (messageIds != null && !messageIds.isEmpty()) { + consumerBase.onAckTimeoutSend(messageIds); + consumerBase.redeliverUnacknowledgedMessages(messageIds); + } timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS); } } @@ -93,35 +98,29 @@ private void addAckTimeoutMessages(UnackMessageIdWrapper messageIdWrapper) { } } - private void triggerRedelivery(ConsumerBase consumerBase) { + private Set getRedeliveryMessages(ConsumerBase consumerBase) { if (ackTimeoutMessages.isEmpty()) { - return; + return null; } Set messageIds = TL_MESSAGE_IDS_SET.get(); messageIds.clear(); - try { - long now = System.currentTimeMillis(); - ackTimeoutMessages.forEach((messageId, timestamp) -> { - if (timestamp <= now) { - addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase); - messageIds.add(messageId); - } - }); - if (!messageIds.isEmpty()) { - log.info("[{}] {} messages will be re-delivered", consumerBase, messageIds.size()); - Iterator iterator = messageIds.iterator(); - while (iterator.hasNext()) { - MessageId messageId = iterator.next(); - ackTimeoutMessages.remove(messageId); - } + long now = System.currentTimeMillis(); + ackTimeoutMessages.forEach((messageId, timestamp) -> { + if (timestamp <= now) { + addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase); + messageIds.add(messageId); } - } finally { - if (messageIds.size() > 0) { - consumerBase.onAckTimeoutSend(messageIds); - consumerBase.redeliverUnacknowledgedMessages(messageIds); + }); + if (!messageIds.isEmpty()) { + log.info("[{}] {} messages will be re-delivered", consumerBase, messageIds.size()); + Iterator iterator = messageIds.iterator(); + while (iterator.hasNext()) { + MessageId messageId = iterator.next(); + ackTimeoutMessages.remove(messageId); } } + return messageIds; } @Override