From e6534f2b64150f5a66023e368be5075d7f59be55 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 26 Mar 2024 07:41:07 +0800 Subject: [PATCH] [fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet (#22283) (cherry picked from commit a52945b1c51fa874667eecb9fea9bf03e5d6153b) (cherry picked from commit bec3be20ff2c0700f65033a914862e9e87b6070d) --- .../pulsar/broker/service/ServerCnx.java | 16 ++- ...PersistentDispatcherMultipleConsumers.java | 8 +- .../pulsar/broker/service/ServerCnxTest.java | 5 +- ...ProducerConsumerMLInitializeDelayTest.java | 108 ++++++++++++++++++ 4 files changed, 131 insertions(+), 6 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 29ba7cb866ef2..489a5aaee93b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1196,10 +1196,20 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady, "Consumer is already present on the connection"); } else if (existingConsumerFuture.isCompletedExceptionally()){ + log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection," + + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId); ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true, - String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s", - remoteAddress, subscriptionName)); - consumers.remove(consumerId, existingConsumerFuture); + String.format("A failed consumer with id is already present on the connection." + + " consumerId: %s, remoteAddress: %s, subscription: %s", + consumerId, remoteAddress, subscriptionName)); + /** + * This future may was failed due to the client closed a in-progress subscribing. + * See {@link #handleCloseConsumer(CommandCloseConsumer)} + * Do not remove the failed future at current line, it will be removed after the progress of + * the previous subscribing is done. + * Before the previous subscribing is done, the new subscribe request will always fail. + * This mechanism is in order to prevent more complex logic to handle the race conditions. + */ commandSender.sendErrorResponse(requestId, error, "Consumer that failed is already present on the connection"); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 2d5815337b2c5..c164abf905dd2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -177,9 +177,15 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (isConsumersExceededOnSubscription()) { - log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name); + log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit {}", + name, consumer); return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit")); } + // This is not an expected scenario, it will never happen in expected. Just print a warn log if the unexpected + // scenario happens. See more detail: https://github.com/apache/pulsar/pull/22283. + if (consumerSet.contains(consumer)) { + log.warn("[{}] Attempting to add a consumer that already registered {}", name, consumer); + } consumerList.add(consumer); if (consumerList.size() > 1 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index b8f3dd77b1cc1..202514f3e85b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -3380,8 +3380,9 @@ public boolean isCompletedExceptionally() { }; // assert error response assertTrue(responseAssert.test(responseAssert)); - // assert consumer-delete event occur - assertEquals(1L, + // The delete event will only occur after the future is completed. + // assert consumer-delete event will not occur. + assertEquals(0L, deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count()); // Server will not close the connection assertTrue(channel.isOpen()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java new file mode 100644 index 0000000000000..ab4e063ae3d83 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import com.carrotsearch.hppc.ObjectSet; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class SimpleProducerConsumerMLInitializeDelayTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setTopicLoadTimeoutSeconds(60 * 5); + } + + @Test(timeOut = 30 * 1000) + public void testConsumerListMatchesConsumerSet() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subName = "sub"; + final int clientOperationTimeout = 3; + final int loadMLDelayMillis = clientOperationTimeout * 3 * 1000; + final int clientMaxBackoffSeconds = clientOperationTimeout * 2; + admin.topics().createNonPartitionedTopic(topicName); + // Create a client with a low operation timeout. + PulsarClient client = PulsarClient.builder() + .serviceUrl(lookupUrl.toString()) + .operationTimeout(clientOperationTimeout, TimeUnit.SECONDS) + .maxBackoffInterval(clientMaxBackoffSeconds, TimeUnit.SECONDS) + .build(); + Consumer consumer = client.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + // Inject a delay for the initialization of ML, to make the consumer to register twice. + // Consumer register twice: the first will be timeout, and try again. + AtomicInteger delayTimes = new AtomicInteger(); + mockZooKeeper.delay(loadMLDelayMillis, (op, s) -> { + if (op.toString().equals("GET") && s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) { + return delayTimes.incrementAndGet() == 1; + } + return false; + }); + admin.topics().unload(topicName); + // Verify: at last, "dispatcher.consumers.size" equals "dispatcher.consumerList.size". + Awaitility.await().atMost(Duration.ofSeconds(loadMLDelayMillis * 3)) + .ignoreExceptions().untilAsserted(() -> { + Dispatcher dispatcher = pulsar.getBrokerService() + .getTopic(topicName, false).join().get() + .getSubscription(subName).getDispatcher(); + ObjectSet consumerSet = WhiteboxImpl.getInternalState(dispatcher, "consumerSet"); + List consumerList = WhiteboxImpl.getInternalState(dispatcher, "consumerList"); + log.info("consumerSet_size: {}, consumerList_size: {}", consumerSet.size(), consumerList.size()); + Assert.assertEquals(consumerList.size(), 1); + Assert.assertEquals(consumerSet.size(), 1); + }); + + // Verify: the topic can be deleted. + consumer.close(); + admin.topics().delete(topicName); + // cleanup. + client.close(); + } +}