From cbf5ac09807a98ba08cdfc09a5a96f6dbe969042 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 24 Apr 2024 16:17:05 +0800 Subject: [PATCH] [fix][broker] Avoid being stuck in 30+ seconds when closing the BrokerService Fixes https://github.com/apache/pulsar/issues/22569 ### Motivation `BrokerService#closeAsync` calls `unloadNamespaceBundlesGracefully` to unload namespaces gracefully. With extensible load manager, it eventually calls `TableViewLoadDataStoreImpl#validateProducer`: ``` BrokerService#unloadNamespaceBundlesGracefully ExtensibleLoadManagerWrapper#disableBroker ExtensibleLoadManagerImpl#disableBroker ServiceUnitStateChannelImpl#cleanOwnerships ServiceUnitStateChannelImpl#doCleanup TableViewLoadDataStoreImpl#removeAsync TableViewLoadDataStoreImpl#validateProducer ``` In `validateProducer`, if the producer is not connected, it will recreate the producer synchronously. However, since the state of `PulsarService` has already been changed to `Closing`, all connect or lookup requests will fail with `ServiceNotReady`. Then the client will retry until timeout. Besides, the unload operation could also trigger the reconnection because the extensible load manager sends the unload event to the `loadbalancer-service-unit-state` topic. ### Modifications The major fix: Before changing PulsarService's state to `Closing`, call `BrokerService#unloadNamespaceBundlesGracefully` first to make the load manager complete the unload operations first. Minor fixes: - Record the time when `LoadManager#disableBroker` is done. - Don't check if producer is disconnected because the producer could retry if it's disconnected. ### Verifications Add `ExtensibleLoadManagerCloseTest` to verify closing `PulsarService` won't take too much time. Here are some test results locally: ``` 2024-04-24T19:43:38,851 - INFO - [main:ExtensibleLoadManagerCloseTest] - Brokers close time: [3342, 3276, 3310] 2024-04-24T19:44:26,711 - INFO - [main:ExtensibleLoadManagerCloseTest] - Brokers close time: [3357, 3258, 3298] 2024-04-24T19:46:16,791 - INFO - [main:ExtensibleLoadManagerCloseTest] - Brokers close time: [3313, 3257, 3263] 2024-04-24T20:13:05,763 - INFO - [main:ExtensibleLoadManagerCloseTest] - Brokers close time: [3304, 3279, 3299] 2024-04-24T20:13:43,979 - INFO - [main:ExtensibleLoadManagerCloseTest] - Brokers close time: [3343, 3308, 3310] ``` As you can see, each broker takes only about 3 seconds to close due to `OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS` value added in https://github.com/apache/pulsar/pull/20315 --- .../apache/pulsar/broker/PulsarService.java | 1 + .../store/TableViewLoadDataStoreImpl.java | 6 +- .../pulsar/broker/service/BrokerService.java | 11 ++ .../ExtensibleLoadManagerCloseTest.java | 107 ++++++++++++++++++ 4 files changed, 120 insertions(+), 5 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 7613a13db22ded..38052b02c5cd05 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -444,6 +444,7 @@ public CompletableFuture closeAsync() { return closeFuture; } LOG.info("Closing PulsarService"); + brokerService.unloadNamespaceBundlesGracefully(); state = State.Closing; // close the service in reverse order v.s. in which they are started diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index d916e917162230..81cf33b4a55d23 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -161,12 +161,8 @@ public synchronized void init() throws IOException { } private void validateProducer() { - if (producer == null || !producer.isConnected()) { + if (producer == null) { try { - if (producer != null) { - producer.close(); - } - producer = null; startProducer(); log.info("Restarted producer on {}", topic); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 295a9a2954126d..1f0cb12258e1de 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -309,6 +309,7 @@ public class BrokerService implements Closeable { private Set brokerEntryPayloadProcessors; private final TopicEventsDispatcher topicEventsDispatcher = new TopicEventsDispatcher(); + private volatile boolean unloaded = false; public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception { this.pulsar = pulsar; @@ -926,9 +927,13 @@ public void unloadNamespaceBundlesGracefully() { } public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean closeWithoutWaitingClientDisconnect) { + if (unloaded) { + return; + } try { log.info("Unloading namespace-bundles..."); // make broker-node unavailable from the cluster + long disableBrokerStartTime = System.nanoTime(); if (pulsar.getLoadManager() != null && pulsar.getLoadManager().get() != null) { try { pulsar.getLoadManager().get().disableBroker(); @@ -937,6 +942,10 @@ public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean cl // still continue and release bundle ownership as broker's registration node doesn't exist. } } + double disableBrokerTimeSeconds = + TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - disableBrokerStartTime)) + / 1000.0; + log.info("Disable broker in load manager completed in {} seconds", disableBrokerTimeSeconds); // unload all namespace-bundles gracefully long closeTopicsStartTime = System.nanoTime(); @@ -966,6 +975,8 @@ public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean cl } } catch (Exception e) { log.error("Failed to disable broker from loadbalancer list {}", e.getMessage(), e); + } finally { + unloaded = true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java new file mode 100644 index 00000000000000..41413f3e3a913a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class ExtensibleLoadManagerCloseTest { + + private static final String clusterName = "test"; + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(1, 0, () -> 0); + private final List brokers = new ArrayList<>(); + private PulsarAdmin admin; + + @BeforeClass(alwaysRun = true) + public void setup() throws Exception { + bk.start(); + for (int i = 0; i < 3; i++) { + final var broker = new PulsarService(brokerConfig()); + broker.start(); + brokers.add(broker); + } + admin = brokers.get(0).getAdminClient(); + admin.clusters().createCluster(clusterName, ClusterData.builder().build()); + admin.tenants().createTenant("public", TenantInfo.builder() + .allowedClusters(Collections.singleton(clusterName)).build()); + admin.namespaces().createNamespace("public/default"); + } + + + @AfterClass(alwaysRun = true, timeOut = 30000) + public void cleanup() throws Exception { + bk.stop(); + } + + private ServiceConfiguration brokerConfig() { + final var config = new ServiceConfiguration(); + config.setClusterName(clusterName); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort()); + config.setManagedLedgerDefaultWriteQuorum(1); + config.setManagedLedgerDefaultAckQuorum(1); + config.setManagedLedgerDefaultEnsembleSize(1); + config.setDefaultNumberOfNamespaceBundles(16); + config.setLoadBalancerAutoBundleSplitEnabled(false); + config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + config.setLoadBalancerDebugModeEnabled(true); + config.setBrokerShutdownTimeoutMs(100); + return config; + } + + + @Test + public void testCloseAfterLoadingBundles() throws Exception { + final var topic = "test"; + admin.topics().createPartitionedTopic(topic, 20); + admin.lookups().lookupPartitionedTopic(topic); + final var client = PulsarClient.builder().serviceUrl(brokers.get(0).getBrokerServiceUrl()).build(); + final var producer = client.newProducer().topic(topic).create(); + producer.close(); + client.close(); + + final var closeTimeMsList = new ArrayList(); + for (var broker : brokers) { + final var startTimeMs = System.currentTimeMillis(); + broker.close(); + closeTimeMsList.add(System.currentTimeMillis() - startTimeMs); + } + log.info("Brokers close time: {}", closeTimeMsList); + for (var closeTimeMs : closeTimeMsList) { + Assert.assertTrue(closeTimeMs < 5000L); + } + } +}